Class: OmfEc::Experiment

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Singleton
Defined in:
omf_ec/lib/omf_ec/experiment.rb

Overview

Experiment class to hold relevant state information

Defined Under Namespace

Classes: MetaData

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Experiment) initialize

Returns a new instance of Experiment



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'omf_ec/lib/omf_ec/experiment.rb', line 32

def initialize
  super
  @id = Time.now.utc.iso8601(3)
  @sliceID = nil
  @state ||= Hashie::Mash.new #TODO: we need to keep history of all the events and not ovewrite them
  @groups ||= []
  @nodes ||= []
  @events ||= []
  @app_definitions ||= Hash.new
  @sub_groups ||= []
  @cmdline_properties ||= Hash.new
  @show_graph = false
  @js_url = nil
  @job_url = nil
  @job_mps = {}
  @ss_url = nil
end

Instance Attribute Details

- (Object) app_definitions

Returns the value of attribute app_definitions



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def app_definitions
  @app_definitions
end

- (Object) assertion

Returns the value of attribute assertion



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def assertion
  @assertion
end

- (Object) cmdline_properties

Returns the value of attribute cmdline_properties



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def cmdline_properties
  @cmdline_properties
end

- (Object) groups (readonly)

Returns the value of attribute groups



20
21
22
# File 'omf_ec/lib/omf_ec/experiment.rb', line 20

def groups
  @groups
end

- (Object) job_mps

Returns the value of attribute job_mps



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def job_mps
  @job_mps
end

- (Object) job_url

Returns the value of attribute job_url



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def job_url
  @job_url
end

- (Object) js_url

Returns the value of attribute js_url



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def js_url
  @js_url
end

- (Object) name

Returns the value of attribute name



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def name
  @name
end

- (Object) nodes

Returns the value of attribute nodes



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def nodes
  @nodes
end

- (Object) oml_uri

Returns the value of attribute oml_uri



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def oml_uri
  @oml_uri
end

- (Object) property

Returns the value of attribute property



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def property
  @property
end

- (Object) show_graph

Returns the value of attribute show_graph



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def show_graph
  @show_graph
end

- (Object) sliceID

Returns the value of attribute sliceID



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def sliceID
  @sliceID
end

- (Object) ss_url

Returns the value of attribute ss_url



19
20
21
# File 'omf_ec/lib/omf_ec/experiment.rb', line 19

def ss_url
  @ss_url
end

- (Object) sub_groups (readonly)

Returns the value of attribute sub_groups



20
21
22
# File 'omf_ec/lib/omf_ec/experiment.rb', line 20

def sub_groups
  @sub_groups
end

Class Method Details

+ (Object) disconnect



312
313
314
315
316
317
318
319
# File 'omf_ec/lib/omf_ec/experiment.rb', line 312

def disconnect
  info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}"
  info "Run the EC again to reattach"
  OmfCommon.el.after(5) do
    OmfCommon.comm.disconnect
    OmfCommon.eventloop.stop
  end
end

+ (Object) done

Disconnect communicator, try to delete any XMPP affiliations



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'omf_ec/lib/omf_ec/experiment.rb', line 272

def done
  info "Experiment: #{OmfEc.experiment.id} finished"
  info "Exit in 15 seconds..."

  # Make sure that all defined events are removed
  OmfEc.experiment.clear_events

  OmfCommon.el.after(10) do
    allGroups do |g|
      # Clean up
      unless g.app_contexts.empty?
        info "Release applications in #{g.name}"
        g.resources[type: 'application'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty?
        info "Release wired network interfaces in #{g.name}"
        g.resources[type: 'net'].release
      end
      unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty?
        info "Release wireless network interfaces in #{g.name}"
        g.resources[type: 'wlan'].release
      end
      # Let release messages go through first
      OmfCommon.el.after(1) do
        info "Configure resources to leave #{g.name}"
        g.resources.membership = { leave: g.address }
      end
    end

    OmfCommon.el.after(4) do
      info "OMF Experiment Controller #{OmfEc::VERSION} - Exit."
      OmfCommon.el.after(1) do
        OmfCommon.comm.disconnect
        OmfCommon.eventloop.stop
      end
    end
  end
  OmfEc.experiment.("state", "finished")
end

+ (Object) ID

Unique experiment id (Class method)



181
182
183
# File 'omf_ec/lib/omf_ec/experiment.rb', line 181

def self.ID
  instance.id
end

+ (Object) leave_memberships

Ask the resources which joined the groups I created to leave



355
356
357
358
359
# File 'omf_ec/lib/omf_ec/experiment.rb', line 355

def leave_memberships
  all_groups do |g|
    g.resources.membership = { leave: g.address }
  end
end

+ (Object) sliceID

Unique slice id (Class method)



186
187
188
# File 'omf_ec/lib/omf_ec/experiment.rb', line 186

def self.sliceID
  instance.sliceID
end

+ (Object) start



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'omf_ec/lib/omf_ec/experiment.rb', line 321

def start
  info "Experiment: #{OmfEc.experiment.id} starts"
  info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil?
  OmfEc.experiment.("state", "running")

  allGroups do |g|
    info "CONFIGURE #{g.members.size} resources to join group #{g.name}"
    debug "CONFIGURE #{g.members.keys} to join group #{g.name}"
    g.members.each do |key, value|
      OmfEc.subscribe_and_monitor(key) do |res|
        #info "Configure '#{key}' to join '#{g.name}'"
        g.synchronize do
          g.members[key] = res.address
        end
        res.configure({ membership: g.address, res_index: OmfEc.experiment.nodes.index(key) }, { assert: OmfEc.experiment.assertion })
      end
    end
  end

  # For every 100 nodes, increase check interval by 1 second
  count = allGroups.inject(0) { |c, g| c += g.members.size }
  interval = count / 100
  interval = 1 if interval < 1
  info "TOTAL resources: #{count}. Events check interval: #{interval}."

  OmfCommon.el.every(interval) do
    EM.next_tick do
      OmfEc.experiment.process_events rescue nil
    end
  end
end

Instance Method Details

- (Object) add_event(name, opts, trigger)



157
158
159
160
161
162
163
164
# File 'omf_ec/lib/omf_ec/experiment.rb', line 157

def add_event(name, opts, trigger)
  self.synchronize do
    warn "Event '#{name}' has already been defined. Overwriting it now." if event(name)
    @events.delete_if { |e| e[:name] == name }
    @events << { name: name, trigger: trigger, aliases: [] }.merge(opts)
    add_periodic_event(event(name)) if opts[:every]
  end
end

- (Object) add_group(group)



134
135
136
137
138
139
# File 'omf_ec/lib/omf_ec/experiment.rb', line 134

def add_group(group)
  self.synchronize do
    raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group
    @groups << group unless group(group.name)
  end
end

- (Object) add_or_update_resource_state(name, opts = {}) Also known as: add_resource



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'omf_ec/lib/omf_ec/experiment.rb', line 74

def add_or_update_resource_state(name, opts = {})
  self.synchronize do
    res = resource_state(name)
    if res
      opts.each do |key, value|
        if value.class == Array
          # Merge array values
          res[key] ||= []
          res[key] += value
          res[key].uniq!
        elsif value.kind_of? Hash
          # Merge hash values
          res[key] ||= {}
          res[key].merge!(value)
        else
          # Overwrite otherwise
          res[key] = value
        end
      end
    else
      debug "Newly discovered resource >> #{name}"
      #res = Hashie::Mash.new({ address: name }).merge(opts)
      opts[:address] = name
      @state[name] = opts

      # Re send membership configure
      #planned_groups = groups_by_res(name)

      #unless planned_groups.empty?
      #  OmfEc.subscribe_and_monitor(name) do |res|
      #    info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}"
      #    res.configure({ membership: planned_groups.map(&:address) }, { assert: OmfEc.experiment.assertion } )
      #  end
      #end
    end
  end
end

- (Object) add_periodic_event(event)



199
200
201
202
203
204
205
# File 'omf_ec/lib/omf_ec/experiment.rb', line 199

def add_periodic_event(event)
  event[:periodic_timer] = OmfCommon.el.every(event[:every]) do
    self.synchronize do
      eval_trigger(event)
    end
  end
end

- (Object) add_property(name, value = nil, description = nil)



58
59
60
61
62
# File 'omf_ec/lib/omf_ec/experiment.rb', line 58

def add_property(name, value = nil, description = nil)
  override_value = @cmdline_properties[name.to_s.to_sym]
  value = override_value unless override_value.nil?
  ExperimentProperty.create(name, value, description)
end

- (Object) add_sub_group(name)



124
125
126
127
128
# File 'omf_ec/lib/omf_ec/experiment.rb', line 124

def add_sub_group(name)
  self.synchronize do
    @sub_groups << name unless @sub_groups.include?(name)
  end
end

- (Boolean) all_groups?(&block)

Returns:

  • (Boolean)


149
150
151
# File 'omf_ec/lib/omf_ec/experiment.rb', line 149

def all_groups?(&block)
  !groups.empty? && groups.all? { |g| block ? block.call(g) : g }
end

- (Object) archive_oedl(script_name)

Archive OEDL content to OML db



237
238
239
240
241
242
243
# File 'omf_ec/lib/omf_ec/experiment.rb', line 237

def archive_oedl(script_name)
  (
    script_name,
    Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))),
    "oedl_content"
  )
end

- (Object) clear_events



166
167
168
169
170
171
172
173
# File 'omf_ec/lib/omf_ec/experiment.rb', line 166

def clear_events
  self.synchronize do
    @events.each do |e|
      e[:periodic_timer].cancel if e[:periodic_timer]
    end
    @events = []
  end
end

- (Object) create_job

If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:

  • a JobService URL has not been provided, i.e. EC runs without needs to contact JS

  • we already have a Job URL, i.e. the job entry has already been created



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'omf_ec/lib/omf_ec/experiment.rb', line 250

def create_job
  return unless @job_url.nil?
  return if @js_url.nil?
  require 'json'
  require 'net/http'
  begin
    job = { name: self.id }
    u = URI.parse(@js_url+'/jobs')
    req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'})
    req.body = JSON.pretty_generate(job)
    res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) }
    raise "Could not create a job for this experiment trial\n"+
          "Response #{res.code} #{res.message}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess
    job = JSON.parse(res.body)
    raise "No valid URL received for the created job for this experiment trial" if job['href'].nil?
    @job_url = job['href']
  end
end

- (Object) each_group(&block)



141
142
143
144
145
146
147
# File 'omf_ec/lib/omf_ec/experiment.rb', line 141

def each_group(&block)
  if block
    groups.each { |g| block.call(g) }
  else
    groups
  end
end

- (Object) eval_trigger(event)



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'omf_ec/lib/omf_ec/experiment.rb', line 207

def eval_trigger(event)
  if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(state)
    # Periodic check event
    event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event]

    @events.delete(event) if event[:consume_event]
    event_names = ([event[:name]] + event[:aliases]).join(', ')
    info "Event triggered: '#{event_names}'"

    # Last in first serve callbacks
    event[:callbacks].reverse.each do |callback|
      callback.call
    end
  end
end

- (Object) event(name)



153
154
155
# File 'omf_ec/lib/omf_ec/experiment.rb', line 153

def event(name)
  @events.find { |v| v[:name] == name || v[:aliases].include?(name) }
end

- (Object) group(name)



130
131
132
# File 'omf_ec/lib/omf_ec/experiment.rb', line 130

def group(name)
  groups.find { |v| v.name == name }
end

- (Object) groups_by_res(res_addr)

Find all groups a given resource belongs to



116
117
118
# File 'omf_ec/lib/omf_ec/experiment.rb', line 116

def groups_by_res(res_addr)
  groups.find_all { |g| g.members.values.include?(res_addr) }
end

- (Object) id

Unique experiment id



176
177
178
# File 'omf_ec/lib/omf_ec/experiment.rb', line 176

def id
  @name || @id
end

- (Object) log_metadata(key, value, domain = 'sys')



231
232
233
234
# File 'omf_ec/lib/omf_ec/experiment.rb', line 231

def (key, value, domain = 'sys')
  #MetaData.inject_metadata(key.to_s, value.to_s)
  MetaData.inject(domain.to_s, key.to_s, value.to_s)
end

- (Object) mp_table_names



223
224
225
226
227
228
229
# File 'omf_ec/lib/omf_ec/experiment.rb', line 223

def mp_table_names
  {}.tap do |m_t_n|
    groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v|
      m_t_n.merge!(v)
    end
  end
end

- (Object) process_events

Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered



191
192
193
194
195
196
197
# File 'omf_ec/lib/omf_ec/experiment.rb', line 191

def process_events
  self.synchronize do
    @events.find_all { |v| v[:every].nil? }.each do |event|
      eval_trigger(event)
    end
  end
end

- (Object) resource_by_hrn(hrn)



70
71
72
# File 'omf_ec/lib/omf_ec/experiment.rb', line 70

def resource_by_hrn(hrn)
  @state[hrn]
end

- (Object) resource_state(address) Also known as: resource



64
65
66
# File 'omf_ec/lib/omf_ec/experiment.rb', line 64

def resource_state(address)
  @state[address]
end

- (Object) state



50
51
52
# File 'omf_ec/lib/omf_ec/experiment.rb', line 50

def state
  @state.values
end

- (Object) sub_group(name)



120
121
122
# File 'omf_ec/lib/omf_ec/experiment.rb', line 120

def sub_group(name)
  @sub_groups.find { |v| v == name }
end