Class: OmfCommon::Comm::AMQP::Topic

Inherits:
Topic
  • Object
show all
Defined in:
omf_common/lib/omf_common/comm/amqp/amqp_topic.rb

Instance Attribute Summary

Attributes inherited from Topic

#id, #routing_key

Instance Method Summary (collapse)

Methods inherited from Topic

[], #add_message_handler, #after, #configure, #create, create, #create_message_and_publish, #error?, #inform, #on_incoming_message, #on_inform, #on_message, #publish, #release, #request

Constructor Details

- (Topic) initialize(id, opts = {}) (private)

Returns a new instance of Topic



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 48

def initialize(id, opts = {})
  unless @communicator = opts.delete(:communicator)
    raise "Missing :communicator option"
  end
  super
  @address = opts[:address]
  @lock = Monitor.new
  @subscribed = false
  @on_subscribed_handlers = []
  # Monitor o.op & o.info by default
  @routing_key = opts[:routing_key] || "o.*"

  _init_amqp
end

Instance Method Details

- (Object) _init_amqp (private)



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 63

def _init_amqp()
  channel = @communicator.channel
  @exchange = channel.topic(id, :auto_delete => true)
  channel.queue("", :exclusive => true, :auto_delete => true) do |queue|
    queue.bind(@exchange, routing_key: @routing_key)

    queue.subscribe do |headers, payload|
      debug "Received message on #{@address} | #{@routing_key}"
      MPReceived.inject(Time.now.to_f, @address, payload.to_s[/mid\":\"(.{36})/, 1]) if OmfCommon::Measure.enabled?
      Message.parse(payload, headers.content_type) do |msg|
        on_incoming_message(msg)
      end
    end
    debug "Subscribed to '#@id'"
    # Call all accumulated on_subscribed handlers
    @lock.synchronize do
      @subscribed = true
      @on_subscribed_handlers.each do |block|
        after(0, &block)
      end
      @on_subscribed_handlers = nil
    end
  end
end

- (Object) _send_message(msg, opts = {}, block = nil) (private)



88
89
90
91
92
93
94
95
96
97
98
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 88

def _send_message(msg, opts = {}, block = nil)
  super
  content_type, content = msg.marshall(self)
  debug "(#{id}) Send message (#{content_type}) #{msg.inspect} TO #{opts[:routing_key]}"
  if @exchange
    @exchange.publish(content, content_type: content_type, message_id: msg.mid, routing_key: opts[:routing_key])
    MPPublished.inject(Time.now.to_f, @address, msg.mid) if OmfCommon::Measure.enabled?
  else
    warn "Unavailable AMQP channel. Dropping message '#{msg}'"
  end
end

- (Object) address



17
18
19
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 17

def address
  @address
end

- (Object) on_subscribed(&block)

Call 'block' when topic is subscribed to underlying messaging infrastructure.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 24

def on_subscribed(&block)
  return unless block

  call_now = false
  @lock.synchronize do
    if @subscribed
      call_now = true
    else
      @on_subscribed_handlers << block
    end
  end
  if call_now
    after(0, &block)
  end
end

- (Object) to_s



13
14
15
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 13

def to_s
  @address
end

- (Object) unsubscribe(key)



40
41
42
43
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 40

def unsubscribe(key)
  super
  #@exchange.delete
end