Class: OmfCommon::Comm::XMPP::Topic
- Inherits:
-
Topic
- Object
- Topic
- OmfCommon::Comm::XMPP::Topic
show all
- Defined in:
- omf_common/lib/omf_common/comm/xmpp/topic.rb
Instance Attribute Summary
Attributes inherited from Topic
#id, #routing_key
Instance Method Summary
(collapse)
Methods inherited from Topic
[], #add_message_handler, #after, #configure, #create, create, #create_message_and_publish, #error?, #inform, #on_incoming_message, #on_inform, #on_message, #publish, #release, #request
Constructor Details
- (Topic) initialize(id, opts = {}, &block)
Returns a new instance of Topic
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 40
def initialize(id, opts = {}, &block)
id, @pubsub_domain = id.to_s.split("@")
if id =~ /^xmpp:\/\/(.+)$/
id = $1
end
@pubsub_domain ||= OmfCommon.comm.jid.domain
super
@on_subscrided_handlers = []
topic_block = proc do |stanza|
if stanza.error?
block.call(stanza) if block
else
block.call(self) if block
@lock.synchronize do
@on_subscrided_handlers.each do |handler|
handler.call
end
end
end
end
OmfCommon.comm._create(id.to_s, pubsub_domain_addr) do |stanza|
if stanza.error?
e_stanza = Blather::StanzaError.import(stanza)
if e_stanza.name == :conflict
OmfCommon.comm._subscribe(id.to_s, pubsub_domain_addr, &topic_block)
else
block.call(stanza) if block
end
else
OmfCommon.comm._subscribe(id.to_s, pubsub_domain_addr, &topic_block)
end
end
event_block = proc do |event|
OmfCommon::Message.parse(event.items.first.payload) do |parsed_msg|
on_incoming_message(parsed_msg)
end
end
OmfCommon.comm.topic_event(default_guard, &event_block)
end
|
Instance Method Details
- (Object) _send_message(msg, opts = {}, block)
90
91
92
93
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 90
def _send_message(msg, opts = {}, block)
super
OmfCommon.comm.publish(self.id, msg, pubsub_domain_addr)
end
|
- (Object) address
def delete_on_message_cbk_by_id(id)
@lock.synchronize do
@on_message_cbks[id] && @on_message_cbks.reject! { |k| k == id.to_s }
end
end
16
17
18
19
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 16
def address
"xmpp://#{id.to_s}@#{@pubsub_domain}"
end
|
- (Object) default_guard
99
100
101
102
103
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 99
def default_guard
proc do |event|
event.node == self.id.to_s
end
end
|
- (Object) on_subscribed(&block)
21
22
23
24
25
26
27
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 21
def on_subscribed(&block)
return unless block
@lock.synchronize do
@on_subscrided_handlers << block
end
end
|
- (Object) pubsub_domain_addr
36
37
38
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 36
def pubsub_domain_addr
"pubsub.#{@pubsub_domain}"
end
|
- (Object) unsubscribe(key)
29
30
31
32
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 29
def unsubscribe(key)
super
OmfCommon.comm._unsubscribe_one(self.id)
end
|
- (Boolean) valid_guard?(guard_proc)
95
96
97
|
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 95
def valid_guard?(guard_proc)
guard_proc && guard_proc.class == Proc
end
|