Guest User

Untitled

a guest
Feb 20th, 2018
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.04 KB | None | 0 0
  1. a worker class that manages queued jobs using a fixed
  2. number of child workers. Well, that's not quite true -- a new worker is
  3. spawned for each job, but you set the total number that may exist at once.
  4.  
  5. There are three components:
  6. 1) queue_worker.rb: The singleton worker that manages the child workers.
  7. You probably want to auto start this. Make sure you give it
  8. :singleton=>true. Read this file for the methods to interact with your
  9. children. (ie. queue_job(), delete_job(), job_progress() )
  10.  
  11. 2) backgroundrb_rails_queue.rb: The super class for the "child workers"
  12. (and uses backgroundrb_rails.rb in turn). This file needs to be included
  13. in background.rb
  14.  
  15. 3) Your child worker, which should be a subclass of
  16. BackgrounDRb::RailsQueue, is otherwise the about same as normal
  17. If it's a big loop, you probably want to use terminate?() on each
  18. iteration and update @progress. Use suicide() at the end to make room
  19. for the next child.
  20.  
  21. Options: (probably in your backgroundrb.yml)
  22. autostart:
  23. :queue_key:
  24. class: queue_worker
  25. args:
  26. :num_child_workers: 2
  27. :child_class: :cost_calculator_worker
  28. :reQ_on_finish: true
  29. :singleton: true
  30.  
  31. :queue_key can be changed to what you want, but it is the permanent key
  32. of the QueueWorker
  33. :num_child_workers: is up to you!
  34. :child_class: your worker class you want as child workers.
  35. :reQ_on_finish: do you want results to be stored in the queue until you
  36. call job_progress!() ?
  37. Note: to be able to access your child jobs w/ the QueueWorker methods,
  38. include a unique :id in your {args} when you queue_job({args})
  39.  
  40. I'll attach the files. If they don't go through, I'll resend as text.
  41.  
  42. BTW, This works well enough for me, but I'm learning as I go too, so no
  43. guarantees :) I don't use the fancy timing options, so ymmmv for
  44. :next_start and :interval.
  45.  
  46. Let me know if you find any issues (though I'm off-line for a week after
  47. this post). I'm wondering myself if it might be better to reuse child
  48. workers instead of re-spawning new ones. Another day maybe.
  49.  
  50. cheers,
  51. David Lemstra
  52. # Put your code that runs your task inside the do_work method
  53. # it will be run automatically in a thread. You have access to
  54. # all of your rails models if you set load_rails to true in the
  55. # config file. You also get @logger inside of this class by default.
  56. require 'monitor.rb'
  57.  
  58. class QueueWorker < BackgrounDRb::Rails
  59.  
  60. attr_reader :q, :id_hash, :completed
  61. def initialize(key, args={})
  62. super(key,args)
  63. @num_child_workers = args[:num_child_workers] ? args[:num_child_workers] : 1
  64. @child_workers = Array.new(@num_child_workers) {|i| Hash[:job_key,nil,:args,nil,:s_thread,nil, :s_mutex,Mutex.new, :child, i] }
  65.  
  66. @q = []
  67. @q.extend(MonitorMixin)
  68. @q_loaded_cv = @q.new_cond
  69. @id_hash = {}
  70. @id_hash_mutex = Mutex.new
  71.  
  72. raise ArgumentError unless args.has_key?(:child_class)
  73. @child_class = args[:child_class]
  74. @reQ_on_finish = args[:reQ_on_finish] || false
  75. @completed = 0
  76. end
  77.  
  78. def queue_job(args)
  79. return nil if @id_hash && args[:id] && @id_hash.has_key?(args[:id])
  80. @q.synchronize do
  81. @q.push args
  82. @id_hash_mutex.synchronize { @id_hash[args[:id]] = {:status=>:queued, :job_key=>nil, :results=>nil } } if args[:id]
  83. @q_loaded_cv.signal
  84. end
  85. return true
  86. end
  87.  
  88. def job_in_progress?(job_id)
  89. @id_hash.has_key?(job_id)
  90. end
  91.  
  92. def job_status?(job_id)
  93. @id_hash_mutex.synchronize do
  94. return nil unless @id_hash.has_key?(job_id)
  95. return @id_hash[job_key][:status]
  96. end
  97. end
  98.  
  99. def job_progress(job_id)
  100. report_hsh = {}
  101. @id_hash_mutex.synchronize do
  102. return nil unless @id_hash.has_key?(job_id)
  103. report_hsh[:status] = @id_hash[job_id][:status]
  104. report_hsh[:progress] = case @id_hash[job_id][:status]
  105. when :queued then
  106. ahead = 0
  107. @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; break; end }
  108. ahead
  109. when :running
  110. w = self[@id_hash[job_id][:job_key]]
  111. w.nil? ? nil : w.progress
  112. when :done
  113. @id_hash[job_id][:results]
  114. else nil
  115. end
  116. end
  117. return report_hsh
  118. end
  119.  
  120. def job_progress!(job_id)
  121. report_hsh = {}
  122. @id_hash_mutex.synchronize do
  123. return nil unless @id_hash.has_key?(job_id)
  124. report_hsh[:status] = @id_hash[job_id][:status]
  125. report_hsh[:progress] = case @id_hash[job_id][:status]
  126. when :queued then
  127. ahead = 0
  128. @q.each_index {|i| if @q[i][:id] == job_id then ahead = i; break; end }
  129. ahead
  130. when :running
  131. w = self[@id_hash[job_id][:job_key]]
  132. if w.nil?
  133. @id_hash.delete(job_id)[:results]
  134. else
  135. w.progress
  136. end
  137. when :done
  138. @id_hash.delete(job_id)[:results]
  139. else nil
  140. end
  141. end
  142. return report_hsh
  143. end
  144.  
  145. def delete_job(job_id)
  146. args = nil
  147. @q.synchronize do @id_hash_mutex.synchronize do
  148. args = @id_hash[job_id]
  149. return true if args.nil?
  150. if args[:status] == :queued
  151. @q.delete_if {|h| h[:id] == job_id }
  152. @id_hash.delete(job_id)
  153. return true
  154. elsif args[:status] == :done
  155. @id_hash.delete(job_id)
  156. return true
  157. end
  158. end
  159. end
  160. ::BackgrounDRb::MiddleMan.instance.delete_worker(args[:job_key]) if args[:status] == :running
  161. return true
  162. end
  163.  
  164. def do_work(args)
  165. # You probably don't want to mess with this method unless you know what's what.
  166. @child_workers.each do |child_hash|
  167. child_hash[:s_thread] = Thread.start do
  168. loop do
  169. # Wait for a new job in the @q
  170. child_hash[:s_mutex].synchronize do
  171. # get the Q mutex and wait for a job
  172. @q.synchronize do
  173. tl = Thread.list
  174. @q_loaded_cv.wait_while { @q.empty? }
  175. child_hash[:args] = @q.shift
  176. if child_hash[:args][:id]
  177. @id_hash_mutex.synchronize do
  178. @id_hash[child_hash[:args][:id]][:status] = :running
  179. child_hash[:job_key] = spawn_worker({:args=>child_hash[:args],:class=>@child_class})
  180. @id_hash[child_hash[:args][:id]][:job_key] = child_hash[:job_key]
  181. end
  182. else
  183. child_hash[:job_key] = spawn_worker(job_args.merge(:class=>@child_class))
  184. end
  185. end
  186. self[child_hash[:job_key]].thread[:DQ_request].wait(child_hash[:s_mutex])
  187. # grab and store the results
  188. if child_hash[:args][:id]
  189. @id_hash_mutex.synchronize do
  190. if @reQ_on_finish
  191. r = self[child_hash[:job_key]].results
  192. @id_hash[child_hash[:args][:id]][:results] = r if r
  193. @id_hash[child_hash[:args][:id]][:status] = :done
  194. @id_hash[child_hash[:args][:id]][:done_at] = Time.now
  195. else
  196. @id_hash.delete(child_hash[:args][:id])
  197. end
  198. end
  199. end
  200. self[child_hash[:job_key]].thread[:DQed].signal
  201. @completed += 1
  202. [:args,:job_key].each {|k| child_hash[k] = nil }
  203. end
  204. # Loop back and wait for the job_key to get killed again....
  205. end
  206. end
  207. end
  208. end
  209.  
  210. private
  211.  
  212. def [](key)
  213. # Use jobs to avoid the access time update w/ []
  214. ::BackgrounDRb::MiddleMan.instance.jobs[key]
  215. end
  216.  
  217. end
  218. module BackgrounDRb
  219.  
  220. class RailsQueue < BackgrounDRb::Rails
  221. attr_reader :progress
  222. def initialize(key, args={})
  223. super(key,args)
  224. @job_ctrl = true
  225. end
  226.  
  227. def start_process
  228. return if schedule_first_run && schedule_first_run.to_i > Time.now.to_i
  229. @thread = Thread.new do
  230. Thread.current[:safe_to_kill] = ConditionVariable.new
  231. Thread.current[:kill] = false
  232. Thread.current[:DQ_request] = ConditionVariable.new
  233. Thread.current[:DQed] = ConditionVariable.new
  234. Thread.current[:mutex] = Mutex.new
  235. begin
  236. Thread.current[:mutex].synchronize do
  237. do_work(@args)
  238. end
  239. rescue Exception => e
  240. @logger.error "#{ e.message } - (#{ e.class })" << "\n" << (e.backtrace or []).join("\n")
  241. end
  242. end
  243. @next_start = @interval.from_now if schedule_repeat
  244. end
  245.  
  246. def results
  247. # Overwrite this method and set reQ_on_finish = true (in the queue worker args)
  248. # to have a process put it's results in back in the queue
  249. # for pickup before being killed
  250. nil
  251. end
  252.  
  253. def before_DQ(args=nil)
  254. # stub method that gets called before dequeue is run.
  255. # Overwrite in your class instance
  256. true
  257. end
  258.  
  259. def terminate(args=nil)
  260. do_DQ(args)
  261. super(args)
  262. end
  263.  
  264. def suicide(args=nil)
  265. do_DQ(args)
  266. kill
  267. Thread.exit
  268. end
  269.  
  270. private
  271. def do_DQ(args=nil)
  272. before_DQ(args)
  273. Thread.current[:DQ_request].signal
  274. Thread.current[:DQed].wait(Thread.current[:mutex])
  275. end
  276. end
  277. end
Add Comment
Please, Sign In to add comment