Class: OmfCommon::Comm::AMQP::Communicator

Inherits:
OmfCommon::Comm show all
Defined in:
omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #on_interrupted, #options, #subscribe

Constructor Details

- (Communicator) initialize(opts = {}) (private)

Returns a new instance of Communicator



131
132
133
134
135
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 131

def initialize(opts = {})
  @on_connected_procs = []
  @on_reconnect = {}
  super
end

Instance Attribute Details

- (Object) channel (readonly)

end



30
31
32
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 30

def channel
  @channel
end

Instance Method Details

- (Object) _connect (private)



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 137

def _connect()
  begin
    last_reported_timestamp = nil
    @session = ::AMQP.connect(@url, @opts) do |connection|
      connection.on_tcp_connection_loss do |conn, settings|
        now = Time.now
        if last_reported_timestamp == nil || (now - last_reported_timestamp) > 60
          warn "Lost connectivity. Trying to reconnect..."
          last_reported_timestamp = now
        end
        _reconnect(conn)
      end
      @channel = ::AMQP::Channel.new(connection, auto_recovery: true, prefetch: 10)

      @on_connected_procs.each do |proc|
        proc.arity == 1 ? proc.call(self) : proc.call
      end

      OmfCommon.eventloop.on_stop do
        connection.close
      end
    end

    rec_delay = @opts[:reconnect_delay]
    @session.on_tcp_connection_failure do
      warn "Cannot connect to AMQP server '#{@url}'. Attempt to retry in #{rec_delay} sec"
      @session = nil
      OmfCommon.eventloop.after(rec_delay) do
        info 'Retrying'
        _connect
      end
    end
    # @session.on_tcp_connection_loss do
      # _reconnect "Appear to have lost tcp connection. Attempt to reconnect in #{rec_delay} sec"
    # end
    @session.on_skipped_heartbeats do
      info '... on_skipped_heartbeats!'
      #_reconnect "Appear to have lost heartbeat. Attempt to reconnect in #{rec_delay} sec"
    end
    @session.on_recovery do
      info 'Recovered!'
      last_reported_timestamp = nil
      @on_reconnect.values.each do |block|
        block.call()
      end
    end
    true
  rescue Exception => ex
    delay = @opts[:reconnect_delay]
    warn "Connecting AMQP failed, will retry in #{delay} (#{ex})"
    OmfCommon.eventloop.after(delay) do
      if _connect
        info 'Reconnection suceeded'
      end
    end
    false
  end
end

- (Object) _reconnect(conn) (private)



196
197
198
199
200
201
202
203
204
205
206
207
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 196

def _reconnect(conn)
  begin
    conn.reconnect(false, 2)
  rescue Exception => ex
    delay = @opts[:reconnect_delay]
    warn "Reconnect AMQP failed, will retry in #{delay} (#{ex})"
    OmfCommon.eventloop.after(delay) do
      info 'Reconnecting'
      _reconnect(conn)
    end
  end
end

- (Object) broadcast_file(file_path, topic_name = nil, opts = {}, &block)



114
115
116
117
118
119
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 114

def broadcast_file(file_path, topic_name = nil, opts = {}, &block)
  topic_name ||= SecureRandom.uuid
  require 'omf_common/comm/amqp/amqp_file_transfer'
  OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block)
  "bdcst:#{@address_prefix + topic_name}"
end

- (Object) conn_info



51
52
53
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 51

def conn_info
  { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] }
end

- (Object) create_topic(topic, opts = {})

Create a new pubsub topic with additional configuration

Parameters:

  • topic (String)

    Pubsub topic name



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 84

def create_topic(topic, opts = {})
  raise "Topic can't be nil or empty" if topic.nil? || topic.to_s.empty?
  opts = opts.dup
  opts[:communicator] = self
  topic = topic.to_s
  if topic.start_with? 'amqp:'
    # absolute address
    unless topic.start_with? @address_prefix
      raise "Cannot subscribe to a topic from different domain (#{topic}) - #{@address_prefix}"
    end
    opts[:address] = topic
    topic = topic.split(@address_prefix).last
  else
    opts[:address] = @address_prefix + topic
  end
  OmfCommon::Comm::AMQP::Topic.create(topic, opts)
end

- (Object) delete_topic(topic, &block)

Delete a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name



105
106
107
108
109
110
111
112
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 105

def delete_topic(topic, &block)
  # FIXME CommProvider?
  if t = OmfCommon::CommProvider::AMQP::Topic.find(topic)
    t.release
  else
    warn "Attempt to delete unknown topic '#{topic}"
  end
end

- (Object) disconnect(opts = {})

Shut down comms layer



60
61
62
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 60

def disconnect(opts = {})
  info "Disconnecting..."
end

- (Object) init(opts = {})

Initialize comms layer



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 34

def init(opts = {})
  @opts = {
    #:ssl (Hash) TLS (SSL) parameters to use.
    heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0.
    #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails
    #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section)
    reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure
  }.merge(opts)

  unless (@url = @opts.delete(:url))
    raise "Missing 'url' option for AQMP layer"
  end
  @address_prefix = @url + '/frcp.'
  _connect()
  super
end

- (Object) on_connected(&block)

TODO: Should be thread safe and check if already connected



65
66
67
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 65

def on_connected(&block)
  @on_connected_procs << block
end

- (Object) on_reconnect(key, &block)

register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If 'block' is nil, the callback is removed



73
74
75
76
77
78
79
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 73

def on_reconnect(key, &block)
  if block.nil?
    @on_reconnect.delete(key)
  else
    @on_reconnect[key] = block
  end
end

- (Object) receive_file(topic_url, file_path = nil, opts = {}, &block)



121
122
123
124
125
126
127
128
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 121

def receive_file(topic_url, file_path = nil, opts = {}, &block)
  if topic_url.start_with? @address_prefix
    topic_url = topic_url[@address_prefix.length .. -1]
  end
  require 'omf_common/comm/amqp/amqp_file_transfer'
  file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx'))
  FileReceiver.new(file_path, @channel, topic_url, opts, &block)
end

- (Object) string_to_topic_address(a_string)



55
56
57
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 55

def string_to_topic_address(a_string)
  @address_prefix+a_string
end