Class: OmfCommon::Comm::Topic
- Inherits:
-
Object
- Object
- OmfCommon::Comm::Topic
show all
- Defined in:
- omf_common/lib/omf_common/comm/topic.rb
Constant Summary
- @@name2inst =
{}
- @@lock =
Monitor.new
Instance Attribute Summary (collapse)
Class Method Summary
(collapse)
Instance Method Summary
(collapse)
-
- (Object) _send_message(msg, opts = {}, block = nil)
private
_send_message will also register callbacks for reply messages by default.
-
- (Object) add_message_handler(handler_name, key, &message_block)
private
-
- (Object) address
-
- (Object) after(delay_sec, &block)
-
- (Object) configure(props = {}, core_props = {}, &block)
-
- (Object) create(res_type, config_props = {}, core_props = {}, &block)
Request the creation of a new resource.
-
- (Object) create_message_and_publish(type, props = {}, core_props = {}, block = nil)
Only used for create, configure and request.
-
- (Boolean) error?
For detecting message publishing error, means if callback indeed yield a
Topic object, there is no publishing error, thus always false.
-
- (Object) inform(type, props = {}, core_props = {}, &block)
-
- (Topic) initialize(id, opts = {})
constructor
private
-
- (Object) on_incoming_message(msg)
private
Process a message received from this topic.
-
- (Object) on_inform(key = nil, &message_block)
-
- (Object) on_message(key = nil, &message_block)
-
- (Object) on_subscribed(&block)
-
- (Object) publish(msg, opts = {}, &block)
-
- (Object) release(resource, core_props = {}, &block)
-
- (Object) request(select = [], core_props = {}, &block)
-
- (Object) unsubscribe(key)
Remove all registered callbacks for 'key'.
Constructor Details
- (Topic) initialize(id, opts = {})
Returns a new instance of Topic
150
151
152
153
154
155
156
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 150
def initialize(id, opts = {})
@id = id
@handlers = {}
@lock = Monitor.new
@context2cbk = {}
end
|
Instance Attribute Details
- (Object) id
Returns the value of attribute id
37
38
39
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 37
def id
@id
end
|
- (Object) routing_key
Returns the value of attribute routing_key
37
38
39
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 37
def routing_key
@routing_key
end
|
Class Method Details
+ (Object) [](name)
33
34
35
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 33
def self.[](name)
@@name2inst[name]
end
|
+ (Object) create(name, opts = {}, &block)
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 17
def self.create(name, opts = {}, &block)
name = name.to_s.to_sym
@@lock.synchronize do
unless @@name2inst[name]
debug "New topic: #{name} | #{opts[:routing_key]}"
@@name2inst[name] = self.new(name, opts, &block)
else
debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}"
block.call(@@name2inst[name]) if block
end
@@name2inst[name]
end
end
|
Instance Method Details
- (Object) _send_message(msg, opts = {}, block = nil)
_send_message will also register callbacks for reply messages by default
160
161
162
163
164
165
166
167
168
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 160
def _send_message(msg, opts = {}, block = nil)
if (block)
debug "(#{id}) register callback for responses to 'mid: #{msg.mid}'"
@lock.synchronize do
@context2cbk[msg.mid.to_s] = { block: block, created_at: Time.now }
end
end
end
|
- (Object) add_message_handler(handler_name, key, &message_block)
205
206
207
208
209
210
211
212
213
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 205
def add_message_handler(handler_name, key, &message_block)
raise ArgumentError, 'Missing message callback' if message_block.nil?
debug "(#{id}) register handler for '#{handler_name}'"
@lock.synchronize do
key ||= OpenSSL::Digest::SHA1.new(message_block.source_location.to_s).to_s
(@handlers[handler_name] ||= {})[key] = message_block
end
self
end
|
- (Object) address
137
138
139
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 137
def address
raise NotImplementedError
end
|
- (Object) after(delay_sec, &block)
141
142
143
144
145
146
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 141
def after(delay_sec, &block)
return unless block
OmfCommon.eventloop.after(delay_sec) do
block.arity == 1 ? block.call(self) : block.call
end
end
|
48
49
50
51
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 48
def configure(props = {}, core_props = {}, &block)
create_message_and_publish(:configure, props, core_props, block)
self
end
|
- (Object) create(res_type, config_props = {}, core_props = {}, &block)
Request the creation of a new resource. Returns itself
41
42
43
44
45
46
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 41
def create(res_type, config_props = {}, core_props = {}, &block)
config_props[:type] ||= res_type
debug "Create resource of type '#{res_type}'"
create_message_and_publish(:create, config_props, core_props, block)
self
end
|
- (Object) create_message_and_publish(type, props = {}, core_props = {}, block = nil)
Only used for create, configure and request
77
78
79
80
81
82
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 77
def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}"
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(type, props, core_props)
publish(msg, { routing_key: "o.op" }, &block)
end
|
- (Boolean) error?
For detecting message publishing error, means if callback indeed yield a
Topic object, there is no publishing error, thus always false
133
134
135
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 133
def error?
false
end
|
59
60
61
62
63
64
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 59
def inform(type, props = {}, core_props = {}, &block)
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
publish(msg, { routing_key: "o.info" }, &block)
self
end
|
- (Object) on_incoming_message(msg)
Process a message received from this topic.
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 174
def on_incoming_message(msg)
type = msg.operation
debug "(#{id}) Deliver message '#{type}': #{msg.inspect}"
htypes = [type, :message]
if type == :inform
if (it = msg.itype(:ruby)) case it
when "creation_ok"
htypes << :create_succeeded
when 'status'
htypes << :inform_status
end
htypes << it.to_sym
end
end
debug "(#{id}) Message type '#{htypes.inspect}' (#{msg.class}:#{msg.cid})"
hs = htypes.map { |ht| (@handlers[ht] || {}).values }.compact.flatten
debug "(#{id}) Distributing message to '#{hs.inspect}'"
hs.each do |block|
block.call msg
end
if cbk = @context2cbk[msg.cid.to_s]
debug "(#{id}) Distributing message to '#{cbk.inspect}'"
cbk[:last_used] = Time.now
cbk[:block].call(msg)
end
end
|
114
115
116
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 114
def on_inform(key = nil, &message_block)
add_message_handler(:inform, key, &message_block)
end
|
- (Object) on_message(key = nil, &message_block)
110
111
112
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 110
def on_message(key = nil, &message_block)
add_message_handler(:message, key, &message_block)
end
|
- (Object) on_subscribed(&block)
128
129
130
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 128
def on_subscribed(&block)
raise NotImplementedError
end
|
- (Object) publish(msg, opts = {}, &block)
84
85
86
87
88
89
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 84
def publish(msg, opts = {}, &block)
error "!!!" if opts[:routing_key].nil?
raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
_send_message(msg, opts, block)
end
|
- (Object) release(resource, core_props = {}, &block)
66
67
68
69
70
71
72
73
74
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 66
def release(resource, core_props = {}, &block)
unless resource.is_a? self.class
raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
end
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
publish(msg, { routing_key: "o.op" }, &block)
self
end
|
- (Object) request(select = [], core_props = {}, &block)
53
54
55
56
57
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 53
def request(select = [], core_props = {}, &block)
create_message_and_publish(:request, select, core_props, block)
self
end
|
- (Object) unsubscribe(key)
Remove all registered callbacks for 'key'. Will also unsubscribe
from the underlying comms layer if no callbacks remain.
121
122
123
124
125
126
|
# File 'omf_common/lib/omf_common/comm/topic.rb', line 121
def unsubscribe(key)
@lock.synchronize do
@handlers.clear
@@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
end
end
|