Class: OmfCommon::Comm
- Inherits:
-
Object
- Object
- OmfCommon::Comm
- Defined in:
- omf_common/lib/omf_common/comm.rb,
omf_common/lib/omf_common/comm/topic.rb,
omf_common/lib/omf_common/comm/xmpp/topic.rb,
omf_common/lib/omf_common/comm/amqp/amqp_mp.rb,
omf_common/lib/omf_common/comm/amqp/amqp_topic.rb,
omf_common/lib/omf_common/comm/local/local_topic.rb,
omf_common/lib/omf_common/comm/xmpp/communicator.rb,
omf_common/lib/omf_common/comm/amqp/amqp_communicator.rb,
omf_common/lib/omf_common/comm/local/local_communicator.rb
Overview
PubSub communication class, can be extended with different implementations
Direct Known Subclasses
Defined Under Namespace
Classes: AMQP, Local, Topic, XMPP
Constant Summary
- @@providers =
{ xmpp: { require: 'omf_common/comm/xmpp/communicator', constructor: 'OmfCommon::Comm::XMPP::Communicator', message_provider: { type: :xml } }, amqp: { require: 'omf_common/comm/amqp/amqp_communicator', constructor: 'OmfCommon::Comm::AMQP::Communicator', message_provider: { type: :json } }, local: { require: 'omf_common/comm/local/local_communicator', constructor: 'OmfCommon::Comm::Local::Communicator', message_provider: { type: :json } } }
- @@instance =
nil
Class Method Summary (collapse)
-
+ (Object) init(opts)
opts: :type - pre installed comms provider :provider - custom provider (opts) :require - gem to load first (opts) :constructor - Class implementing provider.
- + (Object) instance
Instance Method Summary (collapse)
-
- (Hash) conn_info
Returning connection information.
-
- (Object) create_topic(topic, opts = {})
Create a new pubsub topic with additional configuration.
-
- (Object) delete_topic(topic, &block)
Delete a pubsub topic.
-
- (Object) disconnect(opts = {})
Shut down comms layer.
-
- (Object) init(opts = {})
Initialize comms layer.
-
- (Comm) initialize(opts = {})
constructor
private
A new instance of Comm.
-
- (Object) local_address
Return the address used for all 'generic' messages not specifically being sent from a resource.
- - (Object) local_topic
- - (Object) on_connected(&block)
-
- (Object) on_interrupted(*args, &block)
TODO should expand this to on_signal(:INT).
-
- (Object) options
Return the options used to initiate this communicator.
-
- (Object) string_to_topic_address(a_string)
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses.
-
- (Object) subscribe(topic_name, opts = {}, &block)
Subscribe to a pubsub topic.
Constructor Details
- (Comm) initialize(opts = {}) (private)
Returns a new instance of Comm
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'omf_common/lib/omf_common/comm.rb', line 173 def initialize(opts = {}) @opts = opts unless local_address = opts[:local_address] hostname = nil begin hostname = Socket.gethostbyname(Socket.gethostname)[0] rescue hostname = (`hostname` || 'unknown').strip end local_address = "#{hostname}-#{Process.pid}" end on_connected do @local_topic = create_topic(local_address.gsub('.', '-')) end end |
Class Method Details
+ (Object) init(opts)
opts:
:type - pre installed comms provider :provider - custom provider (opts) :require - gem to load first (opts) :constructor - Class implementing provider
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'omf_common/lib/omf_common/comm.rb', line 44 def self.init(opts) unless @@instance unless provider = opts[:provider] unless type = opts[:type] if url = opts[:url] type = url.split(':')[0].to_sym end end provider = @@providers[type] end unless provider raise ArgumentError, "Missing Comm provider declaration. Either define 'type', 'provider', or 'url'" end require provider[:require] if provider[:require] if class_name = provider[:constructor] provider_class = class_name.split('::').inject(Object) {|c,n| c.const_get(n) } inst = provider_class.new(opts) else raise ArgumentError, "Missing communicator creation info - :constructor" end @@instance = inst mopts = provider[:message_provider] mopts[:authenticate] = opts[:auth] Message.init(mopts) if aopts = opts[:auth] require 'omf_common/auth' OmfCommon::Auth.init(aopts) end inst.init(opts) end end |
+ (Object) instance
80 81 82 |
# File 'omf_common/lib/omf_common/comm.rb', line 80 def self.instance @@instance end |
Instance Method Details
- (Hash) conn_info
Returning connection information
130 131 132 |
# File 'omf_common/lib/omf_common/comm.rb', line 130 def conn_info { proto: nil, user: nil, domain: nil } end |
- (Object) create_topic(topic, opts = {})
Create a new pubsub topic with additional configuration
116 117 118 |
# File 'omf_common/lib/omf_common/comm.rb', line 116 def create_topic(topic, opts = {}) raise NotImplementedError end |
- (Object) delete_topic(topic, &block)
Delete a pubsub topic
123 124 125 |
# File 'omf_common/lib/omf_common/comm.rb', line 123 def delete_topic(topic, &block) raise NotImplementedError end |
- (Object) disconnect(opts = {})
Shut down comms layer
101 102 103 |
# File 'omf_common/lib/omf_common/comm.rb', line 101 def disconnect(opts = {}) raise NotImplementedError end |
- (Object) init(opts = {})
Initialize comms layer
86 87 |
# File 'omf_common/lib/omf_common/comm.rb', line 86 def init(opts = {}) end |
- (Object) local_address
Return the address used for all 'generic' messages not specifically being sent from a resource
92 93 94 |
# File 'omf_common/lib/omf_common/comm.rb', line 92 def local_address() @local_topic.address end |
- (Object) local_topic
96 97 98 |
# File 'omf_common/lib/omf_common/comm.rb', line 96 def local_topic() @local_topic end |
- (Object) on_connected(&block)
105 106 107 |
# File 'omf_common/lib/omf_common/comm.rb', line 105 def on_connected(&block) raise NotImplementedError end |
- (Object) on_interrupted(*args, &block)
TODO should expand this to on_signal(:INT)
110 111 |
# File 'omf_common/lib/omf_common/comm.rb', line 110 def on_interrupted(*args, &block) end |
- (Object) options
Return the options used to initiate this communicator.
168 169 170 |
# File 'omf_common/lib/omf_common/comm.rb', line 168 def () @opts end |
- (Object) string_to_topic_address(a_string)
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses
This may be used when we construct an FRCP Configure message, which requests some resources to subscribe to a topic, which has not yet been created at the time of this message's construction, but which will be created before this message is published. (an example of such case can be found in OMF EC Group handling code)
141 142 143 |
# File 'omf_common/lib/omf_common/comm.rb', line 141 def string_to_topic_address(a_string) raise NotImplementedError end |
- (Object) subscribe(topic_name, opts = {}, &block)
Subscribe to a pubsub topic
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'omf_common/lib/omf_common/comm.rb', line 151 def subscribe(topic_name, opts = {}, &block) tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn.to_s, opts) if block t.on_subscribed do block.call(t) end end t end ta[0] end |