Advertisement
Guest User

Untitled

a guest
Jan 7th, 2015
49
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Ruby 1.48 KB | None | 0 0
  1. module Service
  2.   class Queue < Service::Base
  3.     def perform
  4.       self.ready
  5.  
  6.       queues = {
  7.         ::TrackingQueue                   => proc { ::TrackingQueue.all },
  8.         ::LandingPageHitQueue             => proc { ::LandingPageHitQueue.all },
  9.         ::DelayedQueue                    => proc { ::DelayedQueue.all },
  10.         ::ImportQueue                     => proc { ::ImportQueue.queued },
  11.         ::Carmen::BackgroundRunner::Queue => proc { ::Carmen::BackgroundRunner::Queue.all }
  12.       }
  13.  
  14.       if Settings.production?
  15.         queues[::BroadcastQueue] = proc { BroadcastQueue.ready }
  16.       end
  17.  
  18.       queues.each_pair do |queue, scope|
  19.         Thread.new do
  20.           with_connection do
  21.             loop do
  22.               with_error_handler { process(queue, scope) }
  23.               GC.start
  24.               sleep(10)
  25.               break if self.terminated?
  26.             end
  27.           end
  28.         end
  29.       end
  30.  
  31.       say("Watching #{pluralize(queues.keys.size, "queue")} (#{queues.keys.map(&:name).join(", ")})")
  32.     end
  33.  
  34.     private
  35.  
  36.     def process(queue, scope)
  37.       scope = scope.call.where(:locked => false).order(:id => :asc)
  38.       total = scope.count
  39.       return if total.zero?
  40.  
  41.       loop do
  42.         item = scope.find_and_modify({ '$set' => { :locked => true } }, :new => true)
  43.         break unless item
  44.         with_error_handler { item.invoke(self) }
  45.       end
  46.  
  47.       say("#{queue.name}: Processed #{pluralize(total, "record")}")
  48.     end
  49.   end
  50. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement