Class: OmfCommon::Comm::Topic

Inherits:
Object
  • Object
show all
Defined in:
omf_common/lib/omf_common/comm/topic.rb

Direct Known Subclasses

AMQP::Topic, Local::Topic, XMPP::Topic

Constant Summary

@@name2inst =
{}
@@lock =
Monitor.new

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Topic) initialize(id, opts = {}) (private)

Returns a new instance of Topic



150
151
152
153
154
155
156
# File 'omf_common/lib/omf_common/comm/topic.rb', line 150

def initialize(id, opts = {})
  @id = id
  #@address = opts[:address]
  @handlers = {}
  @lock = Monitor.new
  @context2cbk = {}
end

Instance Attribute Details

- (Object) id (readonly)

Returns the value of attribute id



37
38
39
# File 'omf_common/lib/omf_common/comm/topic.rb', line 37

def id
  @id
end

- (Object) routing_key (readonly)

Returns the value of attribute routing_key



37
38
39
# File 'omf_common/lib/omf_common/comm/topic.rb', line 37

def routing_key
  @routing_key
end

Class Method Details

+ (Object) [](name)



33
34
35
# File 'omf_common/lib/omf_common/comm/topic.rb', line 33

def self.[](name)
  @@name2inst[name]
end

+ (Object) create(name, opts = {}, &block)



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'omf_common/lib/omf_common/comm/topic.rb', line 17

def self.create(name, opts = {}, &block)
  # Force string conversion as 'name' can be an ExperimentProperty
  name = name.to_s.to_sym
  @@lock.synchronize do
    unless @@name2inst[name]
      debug "New topic: #{name} | #{opts[:routing_key]}"
      #opts[:address] ||= address_for(name)
      @@name2inst[name] = self.new(name, opts, &block)
    else
      debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}"
      block.call(@@name2inst[name]) if block
    end
    @@name2inst[name]
  end
end

Instance Method Details

- (Object) _send_message(msg, opts = {}, block = nil) (private)

_send_message will also register callbacks for reply messages by default



160
161
162
163
164
165
166
167
168
# File 'omf_common/lib/omf_common/comm/topic.rb', line 160

def _send_message(msg, opts = {}, block = nil)
  if (block)
    # register callback for responses to 'mid'
    debug "(#{id}) register callback for responses to 'mid: #{msg.mid}'"
    @lock.synchronize do
      @context2cbk[msg.mid.to_s] = { block: block, created_at: Time.now }
    end
  end
end

- (Object) add_message_handler(handler_name, key, &message_block) (private)

Raises:

  • (ArgumentError)


205
206
207
208
209
210
211
212
213
# File 'omf_common/lib/omf_common/comm/topic.rb', line 205

def add_message_handler(handler_name, key, &message_block)
  raise ArgumentError, 'Missing message callback' if message_block.nil?
  debug "(#{id}) register handler for '#{handler_name}'"
  @lock.synchronize do
    key ||= OpenSSL::Digest::SHA1.new(message_block.source_location.to_s).to_s
    (@handlers[handler_name] ||= {})[key] = message_block
  end
  self
end

- (Object) address

Raises:

  • (NotImplementedError)


137
138
139
# File 'omf_common/lib/omf_common/comm/topic.rb', line 137

def address
  raise NotImplementedError
end

- (Object) after(delay_sec, &block)



141
142
143
144
145
146
# File 'omf_common/lib/omf_common/comm/topic.rb', line 141

def after(delay_sec, &block)
  return unless block
  OmfCommon.eventloop.after(delay_sec) do
    block.arity == 1 ? block.call(self) : block.call
  end
end

- (Object) configure(props = {}, core_props = {}, &block)



48
49
50
51
# File 'omf_common/lib/omf_common/comm/topic.rb', line 48

def configure(props = {}, core_props = {}, &block)
  create_message_and_publish(:configure, props, core_props, block)
  self
end

- (Object) create(res_type, config_props = {}, core_props = {}, &block)

Request the creation of a new resource. Returns itself



41
42
43
44
45
46
# File 'omf_common/lib/omf_common/comm/topic.rb', line 41

def create(res_type, config_props = {}, core_props = {}, &block)
  config_props[:type] ||= res_type
  debug "Create resource of type '#{res_type}'"
  create_message_and_publish(:create, config_props, core_props, block)
  self
end

- (Object) create_message_and_publish(type, props = {}, core_props = {}, block = nil)

Only used for create, configure and request



77
78
79
80
81
82
# File 'omf_common/lib/omf_common/comm/topic.rb', line 77

def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
  debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}"
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(type, props, core_props)
  publish(msg, { routing_key: "o.op" }, &block)
end

- (Boolean) error?

For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false

Returns:

  • (Boolean)


133
134
135
# File 'omf_common/lib/omf_common/comm/topic.rb', line 133

def error?
  false
end

- (Object) inform(type, props = {}, core_props = {}, &block)



59
60
61
62
63
64
# File 'omf_common/lib/omf_common/comm/topic.rb', line 59

def inform(type, props = {}, core_props = {}, &block)
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
  publish(msg, { routing_key: "o.info" }, &block)
  self
end

- (Object) on_incoming_message(msg) (private)

Process a message received from this topic.

Parameters:



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'omf_common/lib/omf_common/comm/topic.rb', line 174

def on_incoming_message(msg)
  type = msg.operation
  debug "(#{id}) Deliver message '#{type}': #{msg.inspect}"
  htypes = [type, :message]
  if type == :inform
    # TODO keep converting itype is painful, need to solve this.
    if (it = msg.itype(:ruby)) # format itype as lower case string
      case it
      when "creation_ok"
        htypes << :create_succeeded
      when 'status'
        htypes << :inform_status
      end

      htypes << it.to_sym
    end
  end

  debug "(#{id}) Message type '#{htypes.inspect}' (#{msg.class}:#{msg.cid})"
  hs = htypes.map { |ht| (@handlers[ht] || {}).values }.compact.flatten
  debug "(#{id}) Distributing message to '#{hs.inspect}'"
  hs.each do |block|
    block.call msg
  end
  if cbk = @context2cbk[msg.cid.to_s]
    debug "(#{id}) Distributing message to '#{cbk.inspect}'"
    cbk[:last_used] = Time.now
    cbk[:block].call(msg)
  end
end

- (Object) on_inform(key = nil, &message_block)



114
115
116
# File 'omf_common/lib/omf_common/comm/topic.rb', line 114

def on_inform(key = nil, &message_block)
  add_message_handler(:inform, key, &message_block)
end

- (Object) on_message(key = nil, &message_block)



110
111
112
# File 'omf_common/lib/omf_common/comm/topic.rb', line 110

def on_message(key = nil, &message_block)
  add_message_handler(:message, key, &message_block)
end

- (Object) on_subscribed(&block)

Raises:

  • (NotImplementedError)


128
129
130
# File 'omf_common/lib/omf_common/comm/topic.rb', line 128

def on_subscribed(&block)
  raise NotImplementedError
end

- (Object) publish(msg, opts = {}, &block)



84
85
86
87
88
89
# File 'omf_common/lib/omf_common/comm/topic.rb', line 84

def publish(msg, opts = {}, &block)
  error "!!!" if opts[:routing_key].nil?

  raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
  _send_message(msg, opts, block)
end

- (Object) release(resource, core_props = {}, &block)



66
67
68
69
70
71
72
73
74
# File 'omf_common/lib/omf_common/comm/topic.rb', line 66

def release(resource, core_props = {}, &block)
  unless resource.is_a? self.class
    raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
  end
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
  publish(msg, { routing_key: "o.op" }, &block)
  self
end

- (Object) request(select = [], core_props = {}, &block)



53
54
55
56
57
# File 'omf_common/lib/omf_common/comm/topic.rb', line 53

def request(select = [], core_props = {}, &block)
  # TODO: What are the parameters to the request method really?
  create_message_and_publish(:request, select, core_props, block)
  self
end

- (Object) unsubscribe(key)

Remove all registered callbacks for 'key'. Will also unsubscribe from the underlying comms layer if no callbacks remain.



121
122
123
124
125
126
# File 'omf_common/lib/omf_common/comm/topic.rb', line 121

def unsubscribe(key)
  @lock.synchronize do
    @handlers.clear
    @@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
  end
end