Class: OmfCommon::Comm::XMPP::Communicator

Inherits:
OmfCommon::Comm show all
Includes:
Blather::DSL
Defined in:
omf_common/lib/omf_common/comm/xmpp/communicator.rb

Constant Summary

HOST_PREFIX =
'pubsub'
RETRY_INTERVAL =
180
PING_INTERVAL =
1800
PUBSUB_CONFIGURE =
Blather::Stanza::X.new({
  :type => :submit,
  :fields => [
    { :var => "FORM_TYPE", :type => 'hidden', :value => "http://jabber.org/protocol/pubsub#node_config" },
    { :var => "pubsub#persist_items", :value => "0" },
    { :var => "pubsub#purge_offline", :value => "1" },
    { :var => "pubsub#send_last_published_item", :value => "never" },
    { :var => "pubsub#notify_retract",  :value => "0" },
    { :var => "pubsub#publish_model", :value => "open" }]
})

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #options

Constructor Details

- (Communicator) initialize(opts = {}) (private)

Returns a new instance of Communicator



300
301
302
303
304
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 300

def initialize(opts = {})
  self.published_messages = []
  @cbks = {connected: [], interpreted: []}
  super
end

Instance Attribute Details

- (Object) normal_shutdown_mode

Returns the value of attribute normal_shutdown_mode



52
53
54
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 52

def normal_shutdown_mode
  @normal_shutdown_mode
end

- (Object) published_messages

Returns the value of attribute published_messages



52
53
54
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 52

def published_messages
  @published_messages
end

- (Object) retry_counter

Returns the value of attribute retry_counter



52
53
54
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 52

def retry_counter
  @retry_counter
end

Instance Method Details

- (Object) _create(topic, pubsub_host = default_host, &block)



224
225
226
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 224

def _create(topic, pubsub_host = default_host, &block)
  pubsub.create(topic, pubsub_host, PUBSUB_CONFIGURE, &callback_logging(__method__, topic, &block))
end

- (Object) _subscribe(topic, pubsub_host = default_host, &block)



220
221
222
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 220

def (topic, pubsub_host = default_host, &block)
  pubsub.subscribe(topic, nil, pubsub_host, &callback_logging(__method__, topic, &block))
end

- (Object) _unsubscribe_one(topic_id, pubsub_host = default_host)

Un-subscribe one single topic by topic address



241
242
243
244
245
246
247
248
249
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 241

def _unsubscribe_one(topic_id, pubsub_host = default_host)
  pubsub.subscriptions(pubsub_host) do |m|
    m[:subscribed] && m[:subscribed].each do |s|
      if s[:node] == topic_id.to_s
        pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid]))
      end
    end
  end
end

- (Object) affiliations(pubsub_host = default_host, &block)



251
252
253
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 251

def affiliations(pubsub_host = default_host, &block)
  pubsub.affiliations(pubsub_host, &callback_logging(__method__, &block))
end

- (Object) callback_logging(*args, &block) (private)

Provide a new block wrap to automatically log errors



307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 307

def callback_logging(*args, &block)
  m = args.empty? ? "OPERATION" : args.join(" >> ")
  proc do |stanza|
    if stanza.respond_to?(:error?) && stanza.error?
      e_stanza = Blather::StanzaError.import(stanza)
      if [:unexpected_request].include? e_stanza.name
        logger.debug e_stanza
      elsif e_stanza.name == :conflict
        #logger.debug e_stanza
      else
        logger.warn "#{e_stanza} Original: #{e_stanza.original}"
      end
    end
    logger.debug "#{m} SUCCEED" if stanza.respond_to?(:result?) && stanza.result?
    block.call(stanza) if block
  end
end

- (Object) conn_info



69
70
71
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 69

def conn_info
  { proto: :xmpp, user: jid.node, domain: jid.domain }
end

- (Object) connect(username, password, server)

Set up XMPP options and start the Eventmachine, connect to XMPP server



172
173
174
175
176
177
178
179
180
181
182
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 172

def connect(username, password, server)
  info "Connecting to '#{server}' ..."
  begin
    client.run
  rescue ::EventMachine::ConnectionError, Blather::Stream::ConnectionTimeout, Blather::Stream::NoConnection, Blather::Stream::ConnectionFailed => e
    warn "[#{e.class}] #{e}, try again..."
    OmfCommon.el.after(RETRY_INTERVAL) do
      connect(username, password, server)
    end
  end
end

- (Object) create_topic(topic, opts = {})

Create a new pubsub topic with additional configuration

Parameters:

  • topic (String)

    Pubsub topic name



198
199
200
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 198

def create_topic(topic, opts = {})
  OmfCommon::Comm::XMPP::Topic.create(topic)
end

- (Object) default_host (private)



325
326
327
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 325

def default_host
  @pubsub_host || "#{HOST_PREFIX}.#{jid.domain}"
end

- (Object) delete_topic(topic, pubsub_host = default_host, &block)

Delete a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name



205
206
207
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 205

def delete_topic(topic, pubsub_host = default_host, &block)
  pubsub.delete(topic, pubsub_host, &callback_logging(__method__, topic, &block))
end

- (Object) disconnect(opts = {})

Shut down XMPP connection



185
186
187
188
189
190
191
192
193
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 185

def disconnect(opts = {})
  # NOTE Do not clean up
  @lock.synchronize do
    @normal_shutdown_mode = true
  end
  info "Disconnecting..."
  shutdown
  OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'disconnect') if OmfCommon::Measure.enabled?
end

- (Object) init(opts = {})

Set up XMPP options and start the Eventmachine, connect to XMPP server

Raises:

  • (ArgumentError)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 88

def init(opts = {})
  @lock = Monitor.new

  @pubsub_host = opts[:pubsub_domain]
  if opts[:url]
    url = URI(opts[:url])
    username, password, server = url.user, url.password, url.host
  else
    username, password, server = opts[:username], opts[:password], opts[:server]
  end

  random_name = "#{Socket.gethostname}-#{Process.pid}"
  username ||= random_name
  password ||= random_name

  raise ArgumentError, "Username cannot be nil when connect to XMPP" if username.nil?
  raise ArgumentError, "Password cannot be nil when connect to XMPP" if password.nil?
  raise ArgumentError, "Server cannot be nil when connect to XMPP" if server.nil?

  @retry_counter = 0
  @normal_shutdown_mode = false

  username.downcase!
  jid = "#{username}@#{server}"
  client.setup(jid, password)
  connect(username, password, server)

  when_ready do
    if @not_initial_connection
      info "Reconnected"
    else
      info "Connected"
      OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'connected') if OmfCommon::Measure.enabled?
      @cbks[:connected].each { |cbk| cbk.call(self) }
      # It will be reconnection after this
      @lock.synchronize do
        @not_initial_connection = true
      end
    end

    @lock.synchronize do
      @pong = true
      @ping_alive_timer = OmfCommon.el.every(PING_INTERVAL) do
        if @pong
          @lock.synchronize do
            @pong = false # Reset @pong
          end
          ping_alive
        else
          warn "No PONG. No connection..."
          @lock.synchronize do
            @ping_alive_timer.cancel
          end
          connect(username, password, server)
        end
      end
    end
  end

  disconnected do
    @lock.synchronize do
      @pong = false # Reset @pong
      @ping_alive_timer && @ping_alive_timer.cancel
    end

    if normal_shutdown_mode
      shutdown
    else
      warn "Disconnected... Last known state: #{client.state}"
      retry_interval = client.state == :initializing ? 0 : RETRY_INTERVAL
      OmfCommon.el.after(retry_interval) do
        connect(username, password, server)
      end
    end
  end

  trap(:INT) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }
  trap(:TERM) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }

  super
end

- (Object) on_connected(&block)



82
83
84
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 82

def on_connected(&block)
  @cbks[:connected] << block
end

- (Object) on_interrupted(&block)

Capture system :INT & :TERM signal



78
79
80
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 78

def on_interrupted(&block)
  @cbks[:interpreted] << block
end

- (Object) ping_alive (private)



329
330
331
332
333
334
335
336
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 329

def ping_alive
  client.write_with_handler Blather::Stanza::Iq::Ping.new(:get, jid.domain) do |response|
    info response
    @lock.synchronize do
      @pong = true
    end
  end
end

- (Object) publish(topic, message, pubsub_host = default_host, &block)

Publish to a pubsub topic

Parameters:

Raises:

  • (StandardError)


259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 259

def publish(topic, message, pubsub_host = default_host, &block)
  raise StandardError, "Invalid message" unless message.valid?

  message = message.marshall[1] unless message.kind_of? String
  if message.nil?
    debug "Cannot publish empty message, using authentication and not providing a proper cert?"
    return nil
  end

  new_block = proc do |stanza|
    published_messages << OpenSSL::Digest::SHA1.new(message.to_s)
    block.call(stanza) if block
  end

  pubsub.publish(topic, message, pubsub_host, &callback_logging(__method__, topic, &new_block))
  OmfCommon::DSL::Xmpp::MPPublished.inject(Time.now.to_f, jid, topic, message.to_s[/mid="(.{36})/, 1]) if OmfCommon::Measure.enabled?
end

- (Object) string_to_topic_address(a_string)



73
74
75
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 73

def string_to_topic_address(a_string)
  "xmpp://#{a_string}@#{jid.domain}"
end

- (Object) subscribe(topic, opts = {}, &block)

Subscribe to a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :create_if_non_existent (Boolean)

    create the topic if non-existent, use this option with caution



214
215
216
217
218
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 214

def subscribe(topic, opts = {}, &block)
  topic = topic.first if topic.is_a? Array
  OmfCommon::Comm::XMPP::Topic.create(topic, &block)
  OmfCommon::DSL::Xmpp::MPSubscription.inject(Time.now.to_f, jid, 'join', topic) if OmfCommon::Measure.enabled?
end

- (Object) topic_event(additional_guard = nil, &block)

Event callback for pubsub topic event(item published)



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 279

def topic_event(additional_guard = nil, &block)
  guard_block = proc do |event|
    passed = !event.delayed? && event.items? && !event.items.first.payload.nil? #&&
      #!published_messages.include?(OpenSSL::Digest::SHA1.new(event.items.first.payload))

    if additional_guard
      passed && additional_guard.call(event)
    else
      passed
    end
  end

  mblock = proc do |stanza|
    OmfCommon::DSL::Xmpp::MPReceived.inject(Time.now.to_f, jid, stanza.node, stanza.to_s[/mid="(.{36})/, 1]) if OmfCommon::Measure.enabled?
    block.call(stanza) if block
  end
  pubsub_event(guard_block, &callback_logging(__method__, &mblock))
end

- (Object) unsubscribe(pubsub_host = default_host)

Un-subscribe all existing subscriptions from all pubsub topics



230
231
232
233
234
235
236
237
# File 'omf_common/lib/omf_common/comm/xmpp/communicator.rb', line 230

def unsubscribe(pubsub_host = default_host)
  pubsub.subscriptions(pubsub_host) do |m|
    m[:subscribed] && m[:subscribed].each do |s|
      pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid]))
      OmfCommon::DSL::Xmpp::MPSubscription.inject(Time.now.to_f, jid, 'leave', s[:node]) if OmfCommon::Measure.enabled?
    end
  end
end