Class: OmfCommon::Comm::AMQP::Communicator
- Inherits:
-
OmfCommon::Comm
- Object
- OmfCommon::Comm
- OmfCommon::Comm::AMQP::Communicator
- Defined in:
- omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb
Instance Attribute Summary (collapse)
-
- (Object) channel
readonly
end.
Instance Method Summary (collapse)
- - (Object) _connect private
- - (Object) _reconnect(conn) private
- - (Object) broadcast_file(file_path, topic_name = nil, opts = {}, &block)
- - (Object) conn_info
-
- (Object) create_topic(topic, opts = {})
Create a new pubsub topic with additional configuration.
-
- (Object) delete_topic(topic, &block)
Delete a pubsub topic.
-
- (Object) disconnect(opts = {})
Shut down comms layer.
-
- (Object) init(opts = {})
Initialize comms layer.
-
- (Communicator) initialize(opts = {})
constructor
private
A new instance of Communicator.
-
- (Object) on_connected(&block)
TODO: Should be thread safe and check if already connected.
-
- (Object) on_reconnect(key, &block)
register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server.
- - (Object) receive_file(topic_url, file_path = nil, opts = {}, &block)
- - (Object) string_to_topic_address(a_string)
Methods inherited from OmfCommon::Comm
init, instance, #local_address, #local_topic, #on_interrupted, #options, #subscribe
Constructor Details
- (Communicator) initialize(opts = {}) (private)
Returns a new instance of Communicator
131 132 133 134 135 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 131 def initialize(opts = {}) @on_connected_procs = [] @on_reconnect = {} super end |
Instance Attribute Details
- (Object) channel (readonly)
end
30 31 32 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 30 def channel @channel end |
Instance Method Details
- (Object) _connect (private)
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 137 def _connect() begin = nil @session = ::AMQP.connect(@url, @opts) do |connection| connection.on_tcp_connection_loss do |conn, settings| now = Time.now if == nil || (now - ) > 60 warn "Lost connectivity. Trying to reconnect..." = now end _reconnect(conn) end @channel = ::AMQP::Channel.new(connection, auto_recovery: true, prefetch: 10) @on_connected_procs.each do |proc| proc.arity == 1 ? proc.call(self) : proc.call end OmfCommon.eventloop.on_stop do connection.close end end rec_delay = @opts[:reconnect_delay] @session.on_tcp_connection_failure do warn "Cannot connect to AMQP server '#{@url}'. Attempt to retry in #{rec_delay} sec" @session = nil OmfCommon.eventloop.after(rec_delay) do info 'Retrying' _connect end end # @session.on_tcp_connection_loss do # _reconnect "Appear to have lost tcp connection. Attempt to reconnect in #{rec_delay} sec" # end @session.on_skipped_heartbeats do info '... on_skipped_heartbeats!' #_reconnect "Appear to have lost heartbeat. Attempt to reconnect in #{rec_delay} sec" end @session.on_recovery do info 'Recovered!' = nil @on_reconnect.values.each do |block| block.call() end end true rescue Exception => ex delay = @opts[:reconnect_delay] warn "Connecting AMQP failed, will retry in #{delay} (#{ex})" OmfCommon.eventloop.after(delay) do if _connect info 'Reconnection suceeded' end end false end end |
- (Object) _reconnect(conn) (private)
196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 196 def _reconnect(conn) begin conn.reconnect(false, 2) rescue Exception => ex delay = @opts[:reconnect_delay] warn "Reconnect AMQP failed, will retry in #{delay} (#{ex})" OmfCommon.eventloop.after(delay) do info 'Reconnecting' _reconnect(conn) end end end |
- (Object) broadcast_file(file_path, topic_name = nil, opts = {}, &block)
114 115 116 117 118 119 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 114 def broadcast_file(file_path, topic_name = nil, opts = {}, &block) topic_name ||= SecureRandom.uuid require 'omf_common/comm/amqp/amqp_file_transfer' OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block) "bdcst:#{@address_prefix + topic_name}" end |
- (Object) conn_info
51 52 53 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 51 def conn_info { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] } end |
- (Object) create_topic(topic, opts = {})
Create a new pubsub topic with additional configuration
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 84 def create_topic(topic, opts = {}) raise "Topic can't be nil or empty" if topic.nil? || topic.to_s.empty? opts = opts.dup opts[:communicator] = self topic = topic.to_s if topic.start_with? 'amqp:' # absolute address unless topic.start_with? @address_prefix raise "Cannot subscribe to a topic from different domain (#{topic}) - #{@address_prefix}" end opts[:address] = topic topic = topic.split(@address_prefix).last else opts[:address] = @address_prefix + topic end OmfCommon::Comm::AMQP::Topic.create(topic, opts) end |
- (Object) delete_topic(topic, &block)
Delete a pubsub topic
105 106 107 108 109 110 111 112 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 105 def delete_topic(topic, &block) # FIXME CommProvider? if t = OmfCommon::CommProvider::AMQP::Topic.find(topic) t.release else warn "Attempt to delete unknown topic '#{topic}" end end |
- (Object) disconnect(opts = {})
Shut down comms layer
60 61 62 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 60 def disconnect(opts = {}) info "Disconnecting..." end |
- (Object) init(opts = {})
Initialize comms layer
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 34 def init(opts = {}) @opts = { #:ssl (Hash) TLS (SSL) parameters to use. heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0. #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section) reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure }.merge(opts) unless (@url = @opts.delete(:url)) raise "Missing 'url' option for AQMP layer" end @address_prefix = @url + '/frcp.' _connect() super end |
- (Object) on_connected(&block)
TODO: Should be thread safe and check if already connected
65 66 67 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 65 def on_connected(&block) @on_connected_procs << block end |
- (Object) on_reconnect(key, &block)
register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If 'block' is nil, the callback is removed
73 74 75 76 77 78 79 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 73 def on_reconnect(key, &block) if block.nil? @on_reconnect.delete(key) else @on_reconnect[key] = block end end |
- (Object) receive_file(topic_url, file_path = nil, opts = {}, &block)
121 122 123 124 125 126 127 128 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 121 def receive_file(topic_url, file_path = nil, opts = {}, &block) if topic_url.start_with? @address_prefix topic_url = topic_url[@address_prefix.length .. -1] end require 'omf_common/comm/amqp/amqp_file_transfer' file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx')) FileReceiver.new(file_path, @channel, topic_url, opts, &block) end |
- (Object) string_to_topic_address(a_string)
55 56 57 |
# File 'omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb', line 55 def string_to_topic_address(a_string) @address_prefix+a_string end |