Class: OmfEc::Group

Inherits:
Object
  • Object
show all
Extended by:
GroupExt
Includes:
MonitorMixin
Defined in:
omf_ec/lib/omf_ec/group.rb

Overview

Group instance used in experiment script

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from GroupExt

fwd_method_to_aliases, method_added

Constructor Details

- (Group) initialize(name, opts = {}, &block)

Returns a new instance of Group

Parameters:

  • name (String)

    name of the group

  • opts (Hash) (defaults to: {})


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'omf_ec/lib/omf_ec/group.rb', line 30

def initialize(name, opts = {}, &block)
  super()
  self.name = name
  self.id = "#{OmfEc.experiment.id}.#{self.name}"
  # Add empty holders for members, network interfaces, and apps
  self.net_ifs = []
  self.members = {}
  self.app_contexts = []
  self.execs = []
  # To record group 2 group relationship
  @g_aliases = []

  @resource_topics = {}

  OmfEc.subscribe_and_monitor(id, self, &block)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(name, *args, &block)



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'omf_ec/lib/omf_ec/group.rb', line 215

def method_missing(name, *args, &block)
  if name =~ /w(\d+)/
    net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
    if net.nil?
      net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
      self.net_ifs << net
    end
    net
  elsif name =~ /e(\d+)/
    net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
    if net.nil?
      net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
      self.net_ifs << net
    end
    net
  else
    super
  end
end

Instance Attribute Details

- (Object) app_contexts

Returns the value of attribute app_contexts



23
24
25
# File 'omf_ec/lib/omf_ec/group.rb', line 23

def app_contexts
  @app_contexts
end

- (Object) apps (readonly)

holding applications to be added to group



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'omf_ec/lib/omf_ec/group.rb', line 19

class Group
  include MonitorMixin
  extend GroupExt

  attr_accessor :name, :id, :net_ifs, :members, :app_contexts, :execs
  attr_reader :topic, :g_aliases

  fwd_method_to_aliases :startApplications, :stopApplications, :startApplication

  # @param [String] name name of the group
  # @param [Hash] opts
  def initialize(name, opts = {}, &block)
    super()
    self.name = name
    self.id = "#{OmfEc.experiment.id}.#{self.name}"
    # Add empty holders for members, network interfaces, and apps
    self.net_ifs = []
    self.members = {}
    self.app_contexts = []
    self.execs = []
    # To record group 2 group relationship
    @g_aliases = []

    @resource_topics = {}

    OmfEc.subscribe_and_monitor(id, self, &block)
  end

  def address(suffix = nil)
    t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
    OmfCommon.comm.string_to_topic_address(t_id)
  end

  def associate_topic(topic)
    self.synchronize do
      @topic = topic
    end
  end

  def associate_resource_topic(name, res_topic)
    self.synchronize do
      @resource_topics[name] = res_topic
    end
  end

  def resource_topic(name)
    @resource_topics[name]
  end

  # Add existing resources to the group
  #
  # Resources to be added could be a list of resources, groups, or the mixture of both.
  def add_resource(*names)
    names.flatten!

    # When names is array of resource hash
    if !names.empty? && names[0].kind_of?(Hash)
      names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
    end

    synchronize do
      # Recording membership first, used for ALL_UP event
      names.each do |name|
        if (g = OmfEc.experiment.group(name))# resource to add is a group
          @members.merge!(g.members)
          @g_aliases << g
        else
          OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
          @members[name] = nil
        end
      end
    end
  end

  # Create a set of new resources and add them to the group
  #
  # @param [String] name
  # @param [Hash] opts to be used to create new resources
  def create_resource(name, opts, &block)
    self.synchronize do
      raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

      # Make a deep copy of opts in case it contains structures of structures
      begin
        opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
      rescue => e
        raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
      end

      # Naming convention of child resource group
      #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
      resource_group_name = self.address(opts[:type])

      OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
        associate_resource_topic(opts[:type].to_s, res_group)
        # Send create message to group
        r_type = opts.delete(:type)
        @topic.create(r_type, opts.merge(membership: resource_group_name),
                      assert: OmfEc.experiment.assertion)
      end
    end
  end

  # @return [OmfEc::Context::GroupContext]
  def resources
    OmfEc::Context::GroupContext.new(group: self)
  end

  # Add a new Prototype to the NodeSet associated with this Root Path
  #
  # - name = name of the Prototype to associate with the NodeSet of this Path
  # - params = optional, a Hash with the bindings to be passed on to the
  #
  # Prototype instance (see Prototype.instantiate)
  def addPrototype(name, params = nil)
    debug "Use prototype #{name}."
    p = OmfEc::Prototype[name]
    if p.nil?
      error "Unknown prototype '#{name}'"
      return
    end
    p.instantiate(self, params)
  end

  alias_method :prototype, :addPrototype

  def resource_group(type)
    "#{self.id}_#{type.to_s}"
  end

  # Create an application for the group and start it
  #
  def exec(command)
    name = SecureRandom.uuid

    self.synchronize do
      self.execs << name
    end
    create_resource(name, type: 'application', binary_path: command)

    e_name = "#{self.name}_application_#{name}_created"

    resource_group_name = self.address("application")

    def_event e_name do |state|
      state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
    end

    on_event e_name do
      resources[type: 'application', name: name].state = :running
    end
  end

  # Start ONE application by name
  def startApplication(app_name)
    if self.app_contexts.find { |v| v.name == app_name }
      resources[type: 'application', name: app_name].state = :running
    else
      warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
    end
  end

  # Start ALL applications in the group
  def startApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to start"
    else
      resources[type: 'application'].state = :running
    end
  end

  # Stop ALL applications in the group
  def stopApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to stop"
    else
      resources[type: 'application'].state = :stopped
    end
  end

  def addApplication(name, location = nil, &block)
    app_cxt = OmfEc::Context::AppContext.new(name,location,self)
    block.call(app_cxt) if block
    self.app_contexts << app_cxt
  end

  # @example
  #   group('actor', 'node1', 'node2') do |g|
  #     g.net.w0.ip = '0.0.0.0'
  #     g.net.e0.ip = '0.0.0.1'
  #   end
  def net
    self.net_ifs ||= []
    self
  end

  def method_missing(name, *args, &block)
    if name =~ /w(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    elsif name =~ /e(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    else
      super
    end
  end
end

- (Object) execs

Returns the value of attribute execs



23
24
25
# File 'omf_ec/lib/omf_ec/group.rb', line 23

def execs
  @execs
end

- (Object) g_aliases (readonly)

Returns the value of attribute g_aliases



24
25
26
# File 'omf_ec/lib/omf_ec/group.rb', line 24

def g_aliases
  @g_aliases
end

- (Object) id

pubsub topic id of the resource



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'omf_ec/lib/omf_ec/group.rb', line 19

class Group
  include MonitorMixin
  extend GroupExt

  attr_accessor :name, :id, :net_ifs, :members, :app_contexts, :execs
  attr_reader :topic, :g_aliases

  fwd_method_to_aliases :startApplications, :stopApplications, :startApplication

  # @param [String] name name of the group
  # @param [Hash] opts
  def initialize(name, opts = {}, &block)
    super()
    self.name = name
    self.id = "#{OmfEc.experiment.id}.#{self.name}"
    # Add empty holders for members, network interfaces, and apps
    self.net_ifs = []
    self.members = {}
    self.app_contexts = []
    self.execs = []
    # To record group 2 group relationship
    @g_aliases = []

    @resource_topics = {}

    OmfEc.subscribe_and_monitor(id, self, &block)
  end

  def address(suffix = nil)
    t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
    OmfCommon.comm.string_to_topic_address(t_id)
  end

  def associate_topic(topic)
    self.synchronize do
      @topic = topic
    end
  end

  def associate_resource_topic(name, res_topic)
    self.synchronize do
      @resource_topics[name] = res_topic
    end
  end

  def resource_topic(name)
    @resource_topics[name]
  end

  # Add existing resources to the group
  #
  # Resources to be added could be a list of resources, groups, or the mixture of both.
  def add_resource(*names)
    names.flatten!

    # When names is array of resource hash
    if !names.empty? && names[0].kind_of?(Hash)
      names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
    end

    synchronize do
      # Recording membership first, used for ALL_UP event
      names.each do |name|
        if (g = OmfEc.experiment.group(name))# resource to add is a group
          @members.merge!(g.members)
          @g_aliases << g
        else
          OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
          @members[name] = nil
        end
      end
    end
  end

  # Create a set of new resources and add them to the group
  #
  # @param [String] name
  # @param [Hash] opts to be used to create new resources
  def create_resource(name, opts, &block)
    self.synchronize do
      raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

      # Make a deep copy of opts in case it contains structures of structures
      begin
        opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
      rescue => e
        raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
      end

      # Naming convention of child resource group
      #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
      resource_group_name = self.address(opts[:type])

      OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
        associate_resource_topic(opts[:type].to_s, res_group)
        # Send create message to group
        r_type = opts.delete(:type)
        @topic.create(r_type, opts.merge(membership: resource_group_name),
                      assert: OmfEc.experiment.assertion)
      end
    end
  end

  # @return [OmfEc::Context::GroupContext]
  def resources
    OmfEc::Context::GroupContext.new(group: self)
  end

  # Add a new Prototype to the NodeSet associated with this Root Path
  #
  # - name = name of the Prototype to associate with the NodeSet of this Path
  # - params = optional, a Hash with the bindings to be passed on to the
  #
  # Prototype instance (see Prototype.instantiate)
  def addPrototype(name, params = nil)
    debug "Use prototype #{name}."
    p = OmfEc::Prototype[name]
    if p.nil?
      error "Unknown prototype '#{name}'"
      return
    end
    p.instantiate(self, params)
  end

  alias_method :prototype, :addPrototype

  def resource_group(type)
    "#{self.id}_#{type.to_s}"
  end

  # Create an application for the group and start it
  #
  def exec(command)
    name = SecureRandom.uuid

    self.synchronize do
      self.execs << name
    end
    create_resource(name, type: 'application', binary_path: command)

    e_name = "#{self.name}_application_#{name}_created"

    resource_group_name = self.address("application")

    def_event e_name do |state|
      state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
    end

    on_event e_name do
      resources[type: 'application', name: name].state = :running
    end
  end

  # Start ONE application by name
  def startApplication(app_name)
    if self.app_contexts.find { |v| v.name == app_name }
      resources[type: 'application', name: app_name].state = :running
    else
      warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
    end
  end

  # Start ALL applications in the group
  def startApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to start"
    else
      resources[type: 'application'].state = :running
    end
  end

  # Stop ALL applications in the group
  def stopApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to stop"
    else
      resources[type: 'application'].state = :stopped
    end
  end

  def addApplication(name, location = nil, &block)
    app_cxt = OmfEc::Context::AppContext.new(name,location,self)
    block.call(app_cxt) if block
    self.app_contexts << app_cxt
  end

  # @example
  #   group('actor', 'node1', 'node2') do |g|
  #     g.net.w0.ip = '0.0.0.0'
  #     g.net.e0.ip = '0.0.0.1'
  #   end
  def net
    self.net_ifs ||= []
    self
  end

  def method_missing(name, *args, &block)
    if name =~ /w(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    elsif name =~ /e(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    else
      super
    end
  end
end

- (Object) members

holding members to be added to group



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'omf_ec/lib/omf_ec/group.rb', line 19

class Group
  include MonitorMixin
  extend GroupExt

  attr_accessor :name, :id, :net_ifs, :members, :app_contexts, :execs
  attr_reader :topic, :g_aliases

  fwd_method_to_aliases :startApplications, :stopApplications, :startApplication

  # @param [String] name name of the group
  # @param [Hash] opts
  def initialize(name, opts = {}, &block)
    super()
    self.name = name
    self.id = "#{OmfEc.experiment.id}.#{self.name}"
    # Add empty holders for members, network interfaces, and apps
    self.net_ifs = []
    self.members = {}
    self.app_contexts = []
    self.execs = []
    # To record group 2 group relationship
    @g_aliases = []

    @resource_topics = {}

    OmfEc.subscribe_and_monitor(id, self, &block)
  end

  def address(suffix = nil)
    t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
    OmfCommon.comm.string_to_topic_address(t_id)
  end

  def associate_topic(topic)
    self.synchronize do
      @topic = topic
    end
  end

  def associate_resource_topic(name, res_topic)
    self.synchronize do
      @resource_topics[name] = res_topic
    end
  end

  def resource_topic(name)
    @resource_topics[name]
  end

  # Add existing resources to the group
  #
  # Resources to be added could be a list of resources, groups, or the mixture of both.
  def add_resource(*names)
    names.flatten!

    # When names is array of resource hash
    if !names.empty? && names[0].kind_of?(Hash)
      names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
    end

    synchronize do
      # Recording membership first, used for ALL_UP event
      names.each do |name|
        if (g = OmfEc.experiment.group(name))# resource to add is a group
          @members.merge!(g.members)
          @g_aliases << g
        else
          OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
          @members[name] = nil
        end
      end
    end
  end

  # Create a set of new resources and add them to the group
  #
  # @param [String] name
  # @param [Hash] opts to be used to create new resources
  def create_resource(name, opts, &block)
    self.synchronize do
      raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

      # Make a deep copy of opts in case it contains structures of structures
      begin
        opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
      rescue => e
        raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
      end

      # Naming convention of child resource group
      #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
      resource_group_name = self.address(opts[:type])

      OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
        associate_resource_topic(opts[:type].to_s, res_group)
        # Send create message to group
        r_type = opts.delete(:type)
        @topic.create(r_type, opts.merge(membership: resource_group_name),
                      assert: OmfEc.experiment.assertion)
      end
    end
  end

  # @return [OmfEc::Context::GroupContext]
  def resources
    OmfEc::Context::GroupContext.new(group: self)
  end

  # Add a new Prototype to the NodeSet associated with this Root Path
  #
  # - name = name of the Prototype to associate with the NodeSet of this Path
  # - params = optional, a Hash with the bindings to be passed on to the
  #
  # Prototype instance (see Prototype.instantiate)
  def addPrototype(name, params = nil)
    debug "Use prototype #{name}."
    p = OmfEc::Prototype[name]
    if p.nil?
      error "Unknown prototype '#{name}'"
      return
    end
    p.instantiate(self, params)
  end

  alias_method :prototype, :addPrototype

  def resource_group(type)
    "#{self.id}_#{type.to_s}"
  end

  # Create an application for the group and start it
  #
  def exec(command)
    name = SecureRandom.uuid

    self.synchronize do
      self.execs << name
    end
    create_resource(name, type: 'application', binary_path: command)

    e_name = "#{self.name}_application_#{name}_created"

    resource_group_name = self.address("application")

    def_event e_name do |state|
      state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
    end

    on_event e_name do
      resources[type: 'application', name: name].state = :running
    end
  end

  # Start ONE application by name
  def startApplication(app_name)
    if self.app_contexts.find { |v| v.name == app_name }
      resources[type: 'application', name: app_name].state = :running
    else
      warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
    end
  end

  # Start ALL applications in the group
  def startApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to start"
    else
      resources[type: 'application'].state = :running
    end
  end

  # Stop ALL applications in the group
  def stopApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to stop"
    else
      resources[type: 'application'].state = :stopped
    end
  end

  def addApplication(name, location = nil, &block)
    app_cxt = OmfEc::Context::AppContext.new(name,location,self)
    block.call(app_cxt) if block
    self.app_contexts << app_cxt
  end

  # @example
  #   group('actor', 'node1', 'node2') do |g|
  #     g.net.w0.ip = '0.0.0.0'
  #     g.net.e0.ip = '0.0.0.1'
  #   end
  def net
    self.net_ifs ||= []
    self
  end

  def method_missing(name, *args, &block)
    if name =~ /w(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    elsif name =~ /e(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    else
      super
    end
  end
end

- (Object) name

name of the resource



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'omf_ec/lib/omf_ec/group.rb', line 19

class Group
  include MonitorMixin
  extend GroupExt

  attr_accessor :name, :id, :net_ifs, :members, :app_contexts, :execs
  attr_reader :topic, :g_aliases

  fwd_method_to_aliases :startApplications, :stopApplications, :startApplication

  # @param [String] name name of the group
  # @param [Hash] opts
  def initialize(name, opts = {}, &block)
    super()
    self.name = name
    self.id = "#{OmfEc.experiment.id}.#{self.name}"
    # Add empty holders for members, network interfaces, and apps
    self.net_ifs = []
    self.members = {}
    self.app_contexts = []
    self.execs = []
    # To record group 2 group relationship
    @g_aliases = []

    @resource_topics = {}

    OmfEc.subscribe_and_monitor(id, self, &block)
  end

  def address(suffix = nil)
    t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
    OmfCommon.comm.string_to_topic_address(t_id)
  end

  def associate_topic(topic)
    self.synchronize do
      @topic = topic
    end
  end

  def associate_resource_topic(name, res_topic)
    self.synchronize do
      @resource_topics[name] = res_topic
    end
  end

  def resource_topic(name)
    @resource_topics[name]
  end

  # Add existing resources to the group
  #
  # Resources to be added could be a list of resources, groups, or the mixture of both.
  def add_resource(*names)
    names.flatten!

    # When names is array of resource hash
    if !names.empty? && names[0].kind_of?(Hash)
      names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
    end

    synchronize do
      # Recording membership first, used for ALL_UP event
      names.each do |name|
        if (g = OmfEc.experiment.group(name))# resource to add is a group
          @members.merge!(g.members)
          @g_aliases << g
        else
          OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
          @members[name] = nil
        end
      end
    end
  end

  # Create a set of new resources and add them to the group
  #
  # @param [String] name
  # @param [Hash] opts to be used to create new resources
  def create_resource(name, opts, &block)
    self.synchronize do
      raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

      # Make a deep copy of opts in case it contains structures of structures
      begin
        opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
      rescue => e
        raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
      end

      # Naming convention of child resource group
      #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
      resource_group_name = self.address(opts[:type])

      OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
        associate_resource_topic(opts[:type].to_s, res_group)
        # Send create message to group
        r_type = opts.delete(:type)
        @topic.create(r_type, opts.merge(membership: resource_group_name),
                      assert: OmfEc.experiment.assertion)
      end
    end
  end

  # @return [OmfEc::Context::GroupContext]
  def resources
    OmfEc::Context::GroupContext.new(group: self)
  end

  # Add a new Prototype to the NodeSet associated with this Root Path
  #
  # - name = name of the Prototype to associate with the NodeSet of this Path
  # - params = optional, a Hash with the bindings to be passed on to the
  #
  # Prototype instance (see Prototype.instantiate)
  def addPrototype(name, params = nil)
    debug "Use prototype #{name}."
    p = OmfEc::Prototype[name]
    if p.nil?
      error "Unknown prototype '#{name}'"
      return
    end
    p.instantiate(self, params)
  end

  alias_method :prototype, :addPrototype

  def resource_group(type)
    "#{self.id}_#{type.to_s}"
  end

  # Create an application for the group and start it
  #
  def exec(command)
    name = SecureRandom.uuid

    self.synchronize do
      self.execs << name
    end
    create_resource(name, type: 'application', binary_path: command)

    e_name = "#{self.name}_application_#{name}_created"

    resource_group_name = self.address("application")

    def_event e_name do |state|
      state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
    end

    on_event e_name do
      resources[type: 'application', name: name].state = :running
    end
  end

  # Start ONE application by name
  def startApplication(app_name)
    if self.app_contexts.find { |v| v.name == app_name }
      resources[type: 'application', name: app_name].state = :running
    else
      warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
    end
  end

  # Start ALL applications in the group
  def startApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to start"
    else
      resources[type: 'application'].state = :running
    end
  end

  # Stop ALL applications in the group
  def stopApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to stop"
    else
      resources[type: 'application'].state = :stopped
    end
  end

  def addApplication(name, location = nil, &block)
    app_cxt = OmfEc::Context::AppContext.new(name,location,self)
    block.call(app_cxt) if block
    self.app_contexts << app_cxt
  end

  # @example
  #   group('actor', 'node1', 'node2') do |g|
  #     g.net.w0.ip = '0.0.0.0'
  #     g.net.e0.ip = '0.0.0.1'
  #   end
  def net
    self.net_ifs ||= []
    self
  end

  def method_missing(name, *args, &block)
    if name =~ /w(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    elsif name =~ /e(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    else
      super
    end
  end
end

- (Object) net_ifs

network interfaces defined to be added to group



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'omf_ec/lib/omf_ec/group.rb', line 19

class Group
  include MonitorMixin
  extend GroupExt

  attr_accessor :name, :id, :net_ifs, :members, :app_contexts, :execs
  attr_reader :topic, :g_aliases

  fwd_method_to_aliases :startApplications, :stopApplications, :startApplication

  # @param [String] name name of the group
  # @param [Hash] opts
  def initialize(name, opts = {}, &block)
    super()
    self.name = name
    self.id = "#{OmfEc.experiment.id}.#{self.name}"
    # Add empty holders for members, network interfaces, and apps
    self.net_ifs = []
    self.members = {}
    self.app_contexts = []
    self.execs = []
    # To record group 2 group relationship
    @g_aliases = []

    @resource_topics = {}

    OmfEc.subscribe_and_monitor(id, self, &block)
  end

  def address(suffix = nil)
    t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
    OmfCommon.comm.string_to_topic_address(t_id)
  end

  def associate_topic(topic)
    self.synchronize do
      @topic = topic
    end
  end

  def associate_resource_topic(name, res_topic)
    self.synchronize do
      @resource_topics[name] = res_topic
    end
  end

  def resource_topic(name)
    @resource_topics[name]
  end

  # Add existing resources to the group
  #
  # Resources to be added could be a list of resources, groups, or the mixture of both.
  def add_resource(*names)
    names.flatten!

    # When names is array of resource hash
    if !names.empty? && names[0].kind_of?(Hash)
      names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
    end

    synchronize do
      # Recording membership first, used for ALL_UP event
      names.each do |name|
        if (g = OmfEc.experiment.group(name))# resource to add is a group
          @members.merge!(g.members)
          @g_aliases << g
        else
          OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
          @members[name] = nil
        end
      end
    end
  end

  # Create a set of new resources and add them to the group
  #
  # @param [String] name
  # @param [Hash] opts to be used to create new resources
  def create_resource(name, opts, &block)
    self.synchronize do
      raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

      # Make a deep copy of opts in case it contains structures of structures
      begin
        opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
      rescue => e
        raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
      end

      # Naming convention of child resource group
      #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
      resource_group_name = self.address(opts[:type])

      OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
        associate_resource_topic(opts[:type].to_s, res_group)
        # Send create message to group
        r_type = opts.delete(:type)
        @topic.create(r_type, opts.merge(membership: resource_group_name),
                      assert: OmfEc.experiment.assertion)
      end
    end
  end

  # @return [OmfEc::Context::GroupContext]
  def resources
    OmfEc::Context::GroupContext.new(group: self)
  end

  # Add a new Prototype to the NodeSet associated with this Root Path
  #
  # - name = name of the Prototype to associate with the NodeSet of this Path
  # - params = optional, a Hash with the bindings to be passed on to the
  #
  # Prototype instance (see Prototype.instantiate)
  def addPrototype(name, params = nil)
    debug "Use prototype #{name}."
    p = OmfEc::Prototype[name]
    if p.nil?
      error "Unknown prototype '#{name}'"
      return
    end
    p.instantiate(self, params)
  end

  alias_method :prototype, :addPrototype

  def resource_group(type)
    "#{self.id}_#{type.to_s}"
  end

  # Create an application for the group and start it
  #
  def exec(command)
    name = SecureRandom.uuid

    self.synchronize do
      self.execs << name
    end
    create_resource(name, type: 'application', binary_path: command)

    e_name = "#{self.name}_application_#{name}_created"

    resource_group_name = self.address("application")

    def_event e_name do |state|
      state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
    end

    on_event e_name do
      resources[type: 'application', name: name].state = :running
    end
  end

  # Start ONE application by name
  def startApplication(app_name)
    if self.app_contexts.find { |v| v.name == app_name }
      resources[type: 'application', name: app_name].state = :running
    else
      warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
    end
  end

  # Start ALL applications in the group
  def startApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to start"
    else
      resources[type: 'application'].state = :running
    end
  end

  # Stop ALL applications in the group
  def stopApplications
    if self.app_contexts.empty?
      warn "No applications defined in group #{self.name}. Nothing to stop"
    else
      resources[type: 'application'].state = :stopped
    end
  end

  def addApplication(name, location = nil, &block)
    app_cxt = OmfEc::Context::AppContext.new(name,location,self)
    block.call(app_cxt) if block
    self.app_contexts << app_cxt
  end

  # @example
  #   group('actor', 'node1', 'node2') do |g|
  #     g.net.w0.ip = '0.0.0.0'
  #     g.net.e0.ip = '0.0.0.1'
  #   end
  def net
    self.net_ifs ||= []
    self
  end

  def method_missing(name, *args, &block)
    if name =~ /w(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "wlan#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'wlan', :if_name => "wlan#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    elsif name =~ /e(\d+)/
      net = self.net_ifs.find { |v| v.conf[:if_name] == "eth#{$1}" }
      if net.nil?
        net = OmfEc::Context::NetContext.new(:type => 'net', :if_name => "eth#{$1}", :index => $1)
        self.net_ifs << net
      end
      net
    else
      super
    end
  end
end

- (Object) topic (readonly)

Returns the value of attribute topic



24
25
26
# File 'omf_ec/lib/omf_ec/group.rb', line 24

def topic
  @topic
end

Instance Method Details

- (Object) add_resource(*names)

Add existing resources to the group

Resources to be added could be a list of resources, groups, or the mixture of both.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'omf_ec/lib/omf_ec/group.rb', line 71

def add_resource(*names)
  names.flatten!

  # When names is array of resource hash
  if !names.empty? && names[0].kind_of?(Hash)
    names.map! { |v| v['omf_id'] if v['type'] == 'node' }.compact!
  end

  synchronize do
    # Recording membership first, used for ALL_UP event
    names.each do |name|
      if (g = OmfEc.experiment.group(name))# resource to add is a group
        @members.merge!(g.members)
        @g_aliases << g
      else
        OmfEc.experiment.nodes << name unless OmfEc.experiment.nodes.include?(name)
        @members[name] = nil
      end
    end
  end
end

- (Object) addApplication(name, location = nil, &block)



199
200
201
202
203
# File 'omf_ec/lib/omf_ec/group.rb', line 199

def addApplication(name, location = nil, &block)
  app_cxt = OmfEc::Context::AppContext.new(name,location,self)
  block.call(app_cxt) if block
  self.app_contexts << app_cxt
end

- (Object) addPrototype(name, params = nil) Also known as: prototype

Add a new Prototype to the NodeSet associated with this Root Path

  • name = name of the Prototype to associate with the NodeSet of this Path

  • params = optional, a Hash with the bindings to be passed on to the

Prototype instance (see Prototype.instantiate)



133
134
135
136
137
138
139
140
141
# File 'omf_ec/lib/omf_ec/group.rb', line 133

def addPrototype(name, params = nil)
  debug "Use prototype #{name}."
  p = OmfEc::Prototype[name]
  if p.nil?
    error "Unknown prototype '#{name}'"
    return
  end
  p.instantiate(self, params)
end

- (Object) address(suffix = nil)



47
48
49
50
# File 'omf_ec/lib/omf_ec/group.rb', line 47

def address(suffix = nil)
  t_id = suffix ? "#{self.id}_#{suffix.to_s}" : self.id
  OmfCommon.comm.string_to_topic_address(t_id)
end

- (Object) associate_resource_topic(name, res_topic)



58
59
60
61
62
# File 'omf_ec/lib/omf_ec/group.rb', line 58

def associate_resource_topic(name, res_topic)
  self.synchronize do
    @resource_topics[name] = res_topic
  end
end

- (Object) associate_topic(topic)



52
53
54
55
56
# File 'omf_ec/lib/omf_ec/group.rb', line 52

def associate_topic(topic)
  self.synchronize do
    @topic = topic
  end
end

- (Object) create_resource(name, opts, &block)

Create a set of new resources and add them to the group

Parameters:

  • name (String)
  • opts (Hash)

    to be used to create new resources



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'omf_ec/lib/omf_ec/group.rb', line 97

def create_resource(name, opts, &block)
  self.synchronize do
    raise ArgumentError, "Option :type is required for creating resource" if opts[:type].nil?

    # Make a deep copy of opts in case it contains structures of structures
    begin
      opts = Marshal.load ( Marshal.dump(opts.merge(hrn: name)))
    rescue => e
      raise "#{e.message} - Could not deep copy opts: '#{opts.inspect}'"
    end

    # Naming convention of child resource group
    #resource_group_name = "#{self.id}_#{opts[:type].to_s}"
    resource_group_name = self.address(opts[:type])

    OmfEc.subscribe_and_monitor(resource_group_name) do |res_group|
      associate_resource_topic(opts[:type].to_s, res_group)
      # Send create message to group
      r_type = opts.delete(:type)
      @topic.create(r_type, opts.merge(membership: resource_group_name),
                    assert: OmfEc.experiment.assertion)
    end
  end
end

- (Object) exec(command)

Create an application for the group and start it



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'omf_ec/lib/omf_ec/group.rb', line 151

def exec(command)
  name = SecureRandom.uuid

  self.synchronize do
    self.execs << name
  end
  create_resource(name, type: 'application', binary_path: command)

  e_name = "#{self.name}_application_#{name}_created"

  resource_group_name = self.address("application")

  def_event e_name do |state|
    state.find_all { |v| v[:hrn] == name && v[:membership] && v[:membership].include?(resource_group_name)}.size >= self.members.values.sort.uniq.size
  end

  on_event e_name do
    resources[type: 'application', name: name].state = :running
  end
end

- (Object) net

Examples:

group('actor', 'node1', 'node2') do |g|
  g.net.w0.ip = '0.0.0.0'
  g.net.e0.ip = '0.0.0.1'
end


210
211
212
213
# File 'omf_ec/lib/omf_ec/group.rb', line 210

def net
  self.net_ifs ||= []
  self
end

- (Object) resource_group(type)



145
146
147
# File 'omf_ec/lib/omf_ec/group.rb', line 145

def resource_group(type)
  "#{self.id}_#{type.to_s}"
end

- (Object) resource_topic(name)



64
65
66
# File 'omf_ec/lib/omf_ec/group.rb', line 64

def resource_topic(name)
  @resource_topics[name]
end

- (OmfEc::Context::GroupContext) resources



123
124
125
# File 'omf_ec/lib/omf_ec/group.rb', line 123

def resources
  OmfEc::Context::GroupContext.new(group: self)
end

- (Object) startApplication(app_name)

Start ONE application by name



173
174
175
176
177
178
179
# File 'omf_ec/lib/omf_ec/group.rb', line 173

def startApplication(app_name)
  if self.app_contexts.find { |v| v.name == app_name }
    resources[type: 'application', name: app_name].state = :running
  else
    warn "No application with name '#{app_name}' defined in group #{self.name}. Nothing to start"
  end
end

- (Object) startApplications

Start ALL applications in the group



182
183
184
185
186
187
188
# File 'omf_ec/lib/omf_ec/group.rb', line 182

def startApplications
  if self.app_contexts.empty?
    warn "No applications defined in group #{self.name}. Nothing to start"
  else
    resources[type: 'application'].state = :running
  end
end

- (Object) stopApplications

Stop ALL applications in the group



191
192
193
194
195
196
197
# File 'omf_ec/lib/omf_ec/group.rb', line 191

def stopApplications
  if self.app_contexts.empty?
    warn "No applications defined in group #{self.name}. Nothing to stop"
  else
    resources[type: 'application'].state = :stopped
  end
end