Class: OmfCommon::Eventloop::EventMachine

Inherits:
OmfCommon::Eventloop show all
Defined in:
omf_common/lib/omf_common/eventloop/em.rb

Overview

Implements a simple eventloop which only deals with timer events

Instance Method Summary (collapse)

Methods inherited from OmfCommon::Eventloop

init, instance, #join, #on_int_signal, #on_stop, #on_term_signal

Constructor Details

- (EventMachine) initialize(opts = {}, &block)

Returns a new instance of EventMachine



14
15
16
17
18
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 14

def initialize(opts = {}, &block)
  super
  @deferred =  []
  @deferred << block if block
end

Instance Method Details

- (Object) after(delay_sec, &block)

Execute block after some time

Parameters:

  • delay_sec (Float)

    in sec



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 23

def after(delay_sec, &block)
  if EM.reactor_running?
    EM.add_timer(delay_sec) do
      begin
        block.call()
      rescue  => ex
        error "Exception '#{ex}'"
        debug "#{ex}\n\t#{ex.backtrace.join("\n\t")}"
      end
    end
  else
    @deferred << lambda do
      after(delay_sec, &block)
    end
  end
end

- (Object) defer(&block)

Call 'block' in the context of a separate thread.



42
43
44
45
46
47
48
49
50
51
52
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 42

def defer(&block)
  raise "Can't handle 'defer' registration before the EM is up" unless EM.reactor_running?
  EM.defer do
    begin
      block.call()
    rescue  => ex
      error "Exception '#{ex}'"
      debug "#{ex}\n\t#{ex.backtrace.join("\n\t")}"
    end
  end
end

- (Object) every(interval_sec, &block)

Periodically call block every interval_sec

Parameters:

  • interval_sec (Float)

    in sec



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 57

def every(interval_sec, &block)
  # to allow canceling the periodic timer we need to
  # hand back a reference to it which responds to 'cancel'
  # As this is getting rather complex when allowing for
  # registration before the EM is up and running, we simply throw
  # and exception at this time.
  raise "Can't handle 'every' registration before the EM is up" unless EM.reactor_running?
  # if EM.reactor_running?
    # EM.add_periodic_timer(interval_sec, &block)
  # else
    # @deferred << lambda do
      # EM.add_periodic_timer(interval_sec, &block)
    # end
  # end
  t = EM.add_periodic_timer(interval_sec) do
    begin
      block.call(t)
    rescue  => ex
      error "Exception '#{ex}'"
      debug "#{ex}\n\t#{ex.backtrace.join("\n\t")}"
    end
  end
  t
end

- (Object) run(&block)



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 82

def run(&block)
  EM.run do
    @deferred.each { |proc| proc.call }
    @deferred = nil
    if block
      begin
        block.arity == 0 ? block.call : block.call(self)
      rescue  => ex
        error "While executing run block - #{ex}"
        error ex.backtrace.join("\n\t")
      end
    end
  end
end

- (Object) stop



97
98
99
100
# File 'omf_common/lib/omf_common/eventloop/em.rb', line 97

def stop()
  super
  EM.next_tick { EM.stop }
end