Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module Service
- class Queue < Service::Base
- def perform
- self.ready
- queues = {
- ::TrackingQueue => proc { ::TrackingQueue.all },
- ::LandingPageHitQueue => proc { ::LandingPageHitQueue.all },
- ::DelayedQueue => proc { ::DelayedQueue.all },
- ::ImportQueue => proc { ::ImportQueue.queued },
- ::Carmen::BackgroundRunner::Queue => proc { ::Carmen::BackgroundRunner::Queue.all }
- }
- if Settings.production?
- queues[::BroadcastQueue] = proc { BroadcastQueue.ready }
- end
- queues.each_pair do |queue, scope|
- Thread.new do
- with_connection do
- loop do
- with_error_handler { process(queue, scope) }
- GC.start
- sleep(10)
- break if self.terminated?
- end
- end
- end
- end
- say("Watching #{pluralize(queues.keys.size, "queue")} (#{queues.keys.map(&:name).join(", ")})")
- end
- private
- def process(queue, scope)
- scope = scope.call.where(:locked => false).order(:id => :asc)
- total = scope.count
- return if total.zero?
- loop do
- item = scope.find_and_modify({ '$set' => { :locked => true } }, :new => true)
- break unless item
- with_error_handler { item.invoke(self) }
- end
- say("#{queue.name}: Processed #{pluralize(total, "record")}")
- end
- end
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement