Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- a worker class that manages queued jobs using a fixed
- number of child workers. Well, that's not quite true -- a new worker is
- spawned for each job, but you set the total number that may exist at once.
- There are three components:
- 1) queue_worker.rb: The singleton worker that manages the child workers.
- You probably want to auto start this. Make sure you give it
- :singleton=>true. Read this file for the methods to interact with your
- children. (ie. queue_job(), delete_job(), job_progress() )
- 2) backgroundrb_rails_queue.rb: The super class for the "child workers"
- (and uses backgroundrb_rails.rb in turn). This file needs to be included
- in background.rb
- 3) Your child worker, which should be a subclass of
- BackgrounDRb::RailsQueue, is otherwise the about same as normal
- If it's a big loop, you probably want to use terminate?() on each
- iteration and update @progress. Use suicide() at the end to make room
- for the next child.
- Options: (probably in your backgroundrb.yml)
- autostart:
- :queue_key:
- class: queue_worker
- args:
- :num_child_workers: 2
- :child_class: :cost_calculator_worker
- :reQ_on_finish: true
- :singleton: true
- :queue_key can be changed to what you want, but it is the permanent key
- of the QueueWorker
- :num_child_workers: is up to you!
- :child_class: your worker class you want as child workers.
- :reQ_on_finish: do you want results to be stored in the queue until you
- call job_progress!() ?
- Note: to be able to access your child jobs w/ the QueueWorker methods,
- include a unique :id in your {args} when you queue_job({args})
- I'll attach the files. If they don't go through, I'll resend as text.
- BTW, This works well enough for me, but I'm learning as I go too, so no
- guarantees :) I don't use the fancy timing options, so ymmmv for
- :next_start and :interval.
- Let me know if you find any issues (though I'm off-line for a week after
- this post). I'm wondering myself if it might be better to reuse child
- workers instead of re-spawning new ones. Another day maybe.
- cheers,
- David Lemstra
- # Put your code that runs your task inside the do_work method
- # it will be run automatically in a thread. You have access to
- # all of your rails models if you set load_rails to true in the
- # config file. You also get @logger inside of this class by default.
- require 'monitor.rb'
- class QueueWorker < BackgrounDRb::Rails
- attr_reader :q, :id_hash, :completed
- def initialize(key, args={})
- super(key,args)
- @num_child_workers = args[:num_child_workers] ? args[:num_child_workers] : 1
- @child_workers = Array.new(@num_child_workers) {|i| Hash[:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child, i] }
- @q = []
- @q.extend(MonitorMixin)
- @q_loaded_cv = @q.new_cond
- @id_hash = {}
- @id_hash_mutex = Mutex.new
- raise ArgumentError unless args.has_key?(:child_class)
- @child_class = args[:child_class]
- @reQ_on_finish = args[:reQ_on_finish] || false
- @completed = 0
- end
- def queue_job(args)
- return nil if @id_hash && args[:id] && @id_hash.has_key?(args[:id])
- @q.synchronize do
- @q.push args
- @id_hash_mutex.synchronize { @id_hash[args[:id]] = {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id]
- @q_loaded_cv.signal
- end
- return true
- end
- def job_in_progress?(job_id)
- @id_hash.has_key?(job_id)
- end
- def job_status?(job_id)
- @id_hash_mutex.synchronize do
- return nil unless @id_hash.has_key?(job_id)
- return @id_hash[job_key][:status]
- end
- end
- def job_progress(job_id)
- report_hsh = {}
- @id_hash_mutex.synchronize do
- return nil unless @id_hash.has_key?(job_id)
- report_hsh[:status] = @id_hash[job_id][:status]
- report_hsh[:progress] = case @id_hash[job_id][:status]
- when :queued then
- ahead = 0
- @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; break; end }
- ahead
- when :running
- w = self[@id_hash[job_id][:job_key]]
- w.nil? ? nil : w.progress
- when :done
- @id_hash[job_id][:results]
- else nil
- end
- end
- return report_hsh
- end
- def job_progress!(job_id)
- report_hsh = {}
- @id_hash_mutex.synchronize do
- return nil unless @id_hash.has_key?(job_id)
- report_hsh[:status] = @id_hash[job_id][:status]
- report_hsh[:progress] = case @id_hash[job_id][:status]
- when :queued then
- ahead = 0
- @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; break; end }
- ahead
- when :running
- w = self[@id_hash[job_id][:job_key]]
- if w.nil?
- @id_hash.delete(job_id)[:results]
- else
- w.progress
- end
- when :done
- @id_hash.delete(job_id)[:results]
- else nil
- end
- end
- return report_hsh
- end
- def delete_job(job_id)
- args = nil
- @q.synchronize do @id_hash_mutex.synchronize do
- args = @id_hash[job_id]
- return true if args.nil?
- if args[:status] == :queued
- @q.delete_if {|h| h[:id] == job_id }
- @id_hash.delete(job_id)
- return true
- elsif args[:status] == :done
- @id_hash.delete(job_id)
- return true
- end
- end
- end
- ::BackgrounDRb::MiddleMan.instance.delete_worker(args[:job_key]) if args[:status] == :running
- return true
- end
- def do_work(args)
- # You probably don't want to mess with this method unless you know what's what.
- @child_workers.each do |child_hash|
- child_hash[:s_thread] = Thread.start do
- loop do
- # Wait for a new job in the @q
- child_hash[:s_mutex].synchronize do
- # get the Q mutex and wait for a job
- @q.synchronize do
- tl = Thread.list
- @q_loaded_cv.wait_while { @q.empty? }
- child_hash[:args] = @q.shift
- if child_hash[:args][:id]
- @id_hash_mutex.synchronize do
- @id_hash[child_hash[:args][:id]][:status] = :running
- child_hash[:job_key] = spawn_worker({:args=>child_hash[:args],:class=>@child_class})
- @id_hash[child_hash[:args][:id]][:job_key] = child_hash[:job_key]
- end
- else
- child_hash[:job_key] = spawn_worker(job_args.merge(:class=>@child_class))
- end
- end
- self[child_hash[:job_key]].thread[:DQ_request].wait(child_hash[:s_mutex])
- # grab and store the results
- if child_hash[:args][:id]
- @id_hash_mutex.synchronize do
- if @reQ_on_finish
- r = self[child_hash[:job_key]].results
- @id_hash[child_hash[:args][:id]][:results] = r if r
- @id_hash[child_hash[:args][:id]][:status] = :done
- @id_hash[child_hash[:args][:id]][:done_at] = Time.now
- else
- @id_hash.delete(child_hash[:args][:id])
- end
- end
- end
- self[child_hash[:job_key]].thread[:DQed].signal
- @completed += 1
- [:args,:job_key].each {|k| child_hash[k] = nil }
- end
- # Loop back and wait for the job_key to get killed again....
- end
- end
- end
- end
- private
- def [](key)
- # Use jobs to avoid the access time update w/ []
- ::BackgrounDRb::MiddleMan.instance.jobs[key]
- end
- end
- module BackgrounDRb
- class RailsQueue < BackgrounDRb::Rails
- attr_reader :progress
- def initialize(key, args={})
- super(key,args)
- @job_ctrl = true
- end
- def start_process
- return if schedule_first_run && schedule_first_run.to_i > Time.now.to_i
- @thread = Thread.new do
- Thread.current[:safe_to_kill] = ConditionVariable.new
- Thread.current[:kill] = false
- Thread.current[:DQ_request] = ConditionVariable.new
- Thread.current[:DQed] = ConditionVariable.new
- Thread.current[:mutex] = Mutex.new
- begin
- Thread.current[:mutex].synchronize do
- do_work(@args)
- end
- rescue Exception => e
- @logger.error "#{ e.message } - (#{ e.class })" << "\n" << (e.backtrace or []).join("\n")
- end
- end
- @next_start = @interval.from_now if schedule_repeat
- end
- def results
- # Overwrite this method and set reQ_on_finish = true (in the queue worker args)
- # to have a process put it's results in back in the queue
- # for pickup before being killed
- nil
- end
- def before_DQ(args=nil)
- # stub method that gets called before dequeue is run.
- # Overwrite in your class instance
- true
- end
- def terminate(args=nil)
- do_DQ(args)
- super(args)
- end
- def suicide(args=nil)
- do_DQ(args)
- kill
- Thread.exit
- end
- private
- def do_DQ(args=nil)
- before_DQ(args)
- Thread.current[:DQ_request].signal
- Thread.current[:DQed].wait(Thread.current[:mutex])
- end
- end
- end
Add Comment
Please, Sign In to add comment