Class: OmfCommon::Comm::AMQP::FileReceiver

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
omf_common/lib/omf_common/comm/amqp/amqp_file_transfer.rb

Overview

Receives a file broadcast on 'topic' and stores it in a local file. Optionally, it can report on progress through a provided block.

Constant Summary

WAIT_BEFORE_REQUESTING =
2
WAIT_BEFORE_REQUESTING_EVERYTHING =
3 * WAIT_BEFORE_REQUESTING

Instance Method Summary (collapse)

Constructor Details

- (FileReceiver) initialize(file_path, channel, topic, opts = {}, &block)

Returns a new instance of FileReceiver

Parameters:

  • topic (String)

    Name of topic to receive file on

  • file_path (String)

    Path to a local file

  • opts (Hash) (defaults to: {})
  • block

    Called on progress.



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'omf_common/lib/omf_common/comm/amqp/amqp_file_transfer.rb', line 137

def initialize(file_path, channel, topic, opts = {}, &block)
  super() # init monitor mixin
  f = File.open(file_path, 'wb')
  @running = false
  @received_chunks = false
  @outstanding_chunks = Set.new
  @all_requested = false # set to true if we encountered a request for ALL (no 'to')
  @requested_chunks = Set.new
  @received_anything = false
  
  control_topic = "#{topic}_control"
  @control_exchange = channel.topic(control_topic, :auto_delete => true)
  channel.queue("", :exclusive => false) do |queue|
    queue.bind(@control_exchange)
    debug "Subscribing to control topic '#{control_topic}'"
    queue.subscribe do |headers, payload|
      hdrs = headers.headers
      debug "Incoming control message '#{hdrs}'"
      from = hdrs['request_from']
      to = hdrs['request_to']
      synchronize do
        if to
          (from .. to).each { |i| @requested_chunks << i}
        else
          debug "Observed request for everything"
          @all_requested = true
          @nothing_received = -1 * WAIT_BEFORE_REQUESTING # Throttle our own desire to request everything
        end
      end
    end
    @control_queue = queue
  end

  @nothing_received = WAIT_BEFORE_REQUESTING_EVERYTHING - 2 * WAIT_BEFORE_REQUESTING

  data_exchange = channel.topic(topic, :auto_delete => true)
  channel.queue("", :exclusive => false) do |queue|
    queue.bind(data_exchange)
    queue.subscribe do |headers, payload|
      synchronize do
        @received_chunks = true
      end
      hdrs = headers.headers
      chunk_id = hdrs['chunk_id']
      chunk_offset = hdrs['chunk_offset']
      chunk_count = hdrs['chunk_count']
      unless chunk_id && chunk_offset && chunk_count
        debug "Received message with missing 'chunk_id' or 'chunk_offset' header information (#{hdrs})"
      end
      unless @received_anything
        @outstanding_chunks = chunk_count.times.to_set
        synchronize do 
          @running = true 
          @received_anything = true
        end
      end
      next unless @outstanding_chunks.include?(chunk_id)

      debug "Receiving chunk #{chunk_id}"
      f.seek(chunk_offset, IO::SEEK_SET)
      f.write(Base64.decode64(payload))
      @outstanding_chunks.delete(chunk_id)
      received = chunk_count - @outstanding_chunks.size
      if block
        block.call({action: :progress, received: received, progress: 1.0 * received / chunk_count, total: chunk_count})
      end
      
      if @outstanding_chunks.empty?
        # got everything
        f.close
        queue.unsubscribe
        @control_queue.unsubscribe if @control_queue
        @timer.cancel
        synchronize { @running = false }
        debug "Fully received #{file_path}"
        if block
          block.call({action: :done, size: hdrs['file_size'], 
            path: file_path, mime_type: hdrs['mime_type'], 
            received: chunk_count})
        end           
      end
    end
  end
  
  @timer = OmfCommon.eventloop.every(WAIT_BEFORE_REQUESTING) do
    from = to = nil
    synchronize do
      #puts "RUNNING: #{@running}"
      #break unless @running
      if @received_chunks
        @received_chunks = false
        @nothing_received = 0
        break # ok there is still action
      else
        # nothing happened, so let's ask for something
        if (@nothing_received += WAIT_BEFORE_REQUESTING) >= WAIT_BEFORE_REQUESTING_EVERYTHING
          # something stuck here, let's re-ask for everything
          from = 0
          @nothing_received = 0
        else
          # ask_for is the set of chunks we are still missing but haven't asked for              
          ask_for = @outstanding_chunks - @requested_chunks
          break if ask_for.empty? # ok, someone already asked, so better wait
          
          # Ask for a single span of consecutive chunks 
          aa = ask_for.to_a.sort
          from = to = aa[0]
          aa.each.with_index do |e, i| 
            break unless (from + i == e) 
            to = e
            @requested_chunks << e
          end
        end
        
      end
    end
    if from
      headers = {request_from: from}
      headers[:request_to] = to if to  # if nil, ask for everything
      @control_exchange.publish(nil, {headers: headers})
    end
  end 
  
end