Class: OmfCommon::Comm::XMPP::Topic

Inherits:
Topic
  • Object
show all
Defined in:
omf_common/lib/omf_common/comm/xmpp/topic.rb

Instance Attribute Summary

Attributes inherited from Topic

#id, #routing_key

Instance Method Summary (collapse)

Methods inherited from Topic

[], #add_message_handler, #after, #configure, #create, create, #create_message_and_publish, #error?, #inform, #on_incoming_message, #on_inform, #on_message, #publish, #release, #request

Constructor Details

- (Topic) initialize(id, opts = {}, &block) (private)

Returns a new instance of Topic



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 40

def initialize(id, opts = {}, &block)
  id, @pubsub_domain = id.to_s.split("@")
  if id =~ /^xmpp:\/\/(.+)$/
    id = $1
  end
  @pubsub_domain ||= OmfCommon.comm.jid.domain

  super

  @on_subscrided_handlers = []

  topic_block = proc do |stanza|
    if stanza.error?
      block.call(stanza) if block
    else
      block.call(self) if block

      @lock.synchronize do
        @on_subscrided_handlers.each do |handler|
          handler.call
        end
      end
    end
  end

  # Create xmpp pubsub topic, then subscribe to it
  #
  OmfCommon.comm._create(id.to_s, pubsub_domain_addr) do |stanza|
    if stanza.error?
      e_stanza = Blather::StanzaError.import(stanza)
      if e_stanza.name == :conflict
        # Topic exists, just subscribe to it.
        OmfCommon.comm.(id.to_s, pubsub_domain_addr, &topic_block)
      else
        block.call(stanza) if block
      end
    else
      OmfCommon.comm.(id.to_s, pubsub_domain_addr, &topic_block)
    end
  end

  event_block = proc do |event|
    OmfCommon::Message.parse(event.items.first.payload) do |parsed_msg|
      on_incoming_message(parsed_msg)
    end
  end

  OmfCommon.comm.topic_event(default_guard, &event_block)
end

Instance Method Details

- (Object) _send_message(msg, opts = {}, block) (private)



90
91
92
93
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 90

def _send_message(msg, opts = {}, block)
  super
  OmfCommon.comm.publish(self.id, msg, pubsub_domain_addr)
end

- (Object) address

def delete_on_message_cbk_by_id(id)

  @lock.synchronize do
    @on_message_cbks[id] && @on_message_cbks.reject! { |k| k == id.to_s }
  end
end


16
17
18
19
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 16

def address
  #"xmpp://#{id.to_s}@#{OmfCommon.comm.jid.domain}"
  "xmpp://#{id.to_s}@#{@pubsub_domain}"
end

- (Object) default_guard (private)



99
100
101
102
103
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 99

def default_guard
  proc do |event|
    event.node == self.id.to_s
  end
end

- (Object) on_subscribed(&block)



21
22
23
24
25
26
27
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 21

def on_subscribed(&block)
  return unless block

  @lock.synchronize do
    @on_subscrided_handlers << block
  end
end

- (Object) pubsub_domain_addr (private)



36
37
38
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 36

def pubsub_domain_addr
  "pubsub.#{@pubsub_domain}"
end

- (Object) unsubscribe(key)



29
30
31
32
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 29

def unsubscribe(key)
  super
  OmfCommon.comm._unsubscribe_one(self.id)
end

- (Boolean) valid_guard?(guard_proc) (private)

Returns:

  • (Boolean)


95
96
97
# File 'omf_common/lib/omf_common/comm/xmpp/topic.rb', line 95

def valid_guard?(guard_proc)
  guard_proc && guard_proc.class == Proc
end