Class: OmfCommon::Comm::AMQP::Topic
- Inherits:
-
Topic
- Object
- Topic
- OmfCommon::Comm::AMQP::Topic
show all
- Defined in:
- omf_common/lib/omf_common/comm/amqp/amqp_topic.rb
Instance Attribute Summary
Attributes inherited from Topic
#id, #routing_key
Instance Method Summary
(collapse)
Methods inherited from Topic
[], #add_message_handler, #after, #configure, #create, create, #create_message_and_publish, #error?, #inform, #on_incoming_message, #on_inform, #on_message, #publish, #release, #request
Constructor Details
- (Topic) initialize(id, opts = {})
Returns a new instance of Topic
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 48
def initialize(id, opts = {})
unless @communicator = opts.delete(:communicator)
raise "Missing :communicator option"
end
super
@address = opts[:address]
@lock = Monitor.new
@subscribed = false
@on_subscribed_handlers = []
@routing_key = opts[:routing_key] || "o.*"
_init_amqp
end
|
Instance Method Details
- (Object) _init_amqp
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 63
def _init_amqp()
channel = @communicator.channel
@exchange = channel.topic(id, :auto_delete => true)
channel.queue("", :exclusive => true, :auto_delete => true) do |queue|
queue.bind(@exchange, routing_key: @routing_key)
queue.subscribe do |, payload|
debug "Received message on #{@address} | #{@routing_key}"
MPReceived.inject(Time.now.to_f, @address, payload.to_s[/mid\":\"(.{36})/, 1]) if OmfCommon::Measure.enabled?
Message.parse(payload, .content_type) do |msg|
on_incoming_message(msg)
end
end
debug "Subscribed to '#@id'"
@lock.synchronize do
@subscribed = true
@on_subscribed_handlers.each do |block|
after(0, &block)
end
@on_subscribed_handlers = nil
end
end
end
|
- (Object) _send_message(msg, opts = {}, block = nil)
88
89
90
91
92
93
94
95
96
97
98
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 88
def _send_message(msg, opts = {}, block = nil)
super
content_type, content = msg.marshall(self)
debug "(#{id}) Send message (#{content_type}) #{msg.inspect} TO #{opts[:routing_key]}"
if @exchange
@exchange.publish(content, content_type: content_type, message_id: msg.mid, routing_key: opts[:routing_key])
MPPublished.inject(Time.now.to_f, @address, msg.mid) if OmfCommon::Measure.enabled?
else
warn "Unavailable AMQP channel. Dropping message '#{msg}'"
end
end
|
- (Object) address
17
18
19
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 17
def address
@address
end
|
- (Object) on_subscribed(&block)
Call 'block' when topic is subscribed to underlying messaging
infrastructure.
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 24
def on_subscribed(&block)
return unless block
call_now = false
@lock.synchronize do
if @subscribed
call_now = true
else
@on_subscribed_handlers << block
end
end
if call_now
after(0, &block)
end
end
|
- (Object) to_s
13
14
15
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 13
def to_s
@address
end
|
- (Object) unsubscribe(key)
40
41
42
43
|
# File 'omf_common/lib/omf_common/comm/amqp/amqp_topic.rb', line 40
def unsubscribe(key)
super
end
|