Guest User

Untitled

a guest
Jul 20th, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.16 KB | None | 0 0
  1. #
  2. # This file is part of ThreadPool, a jruby or ruby-based
  3. # thread pool manager.
  4. # Copyright (C) 2009,2010 Daniel Bush
  5. # This program is distributed under the terms of the MIT
  6. # license.
  7. # A copy of the license is
  8. # enclosed with this project in the file LICENSE.
  9. #
  10. #
  11.  
  12. module ThreadPooling
  13.  
  14. # A class containing an internal Queue and pool of threads.
  15. #
  16. # ThreadPool uses a 'dispatch' method with a block for putting jobs on
  17. # the queue to be processed asynchronously:
  18. #
  19. # tp = ThreadPool.new(5) # Create 5 threads
  20. # tp.dispatch do
  21. # ... your task ...
  22. # end
  23. #
  24. # Or lambdas
  25. #
  26. # func = lambda { ... your task ... }
  27. # tp.dispatch func
  28. #
  29. # In fact, any object that responds to 'call' should be ok.
  30.  
  31.  
  32. class ThreadPool
  33.  
  34. require 'thread'
  35.  
  36. attr_reader :threads , :thread_count, :queue
  37. attr_writer :debug
  38.  
  39. # Initialize a ThreadPool instance with 'num' number
  40. # of threads.
  41.  
  42. def initialize num=1
  43. @thread_count=0
  44. @threads=[]
  45. # Other option is to use ThreadGroup.
  46. @queue = Queue.new
  47. @mutex = Mutex.new
  48. # Private mutex.
  49. self.increment(num)
  50. require 'logger'
  51. @logger = Logger.new('log/pool.log')
  52. end
  53.  
  54. def debug msg
  55. @mutex.synchronize do
  56. puts msg
  57. end
  58. end
  59.  
  60. # Add threads to the pool
  61.  
  62. def increment num=1
  63. num.times do
  64. @mutex.synchronize do
  65. @threads.push(
  66. Thread.new do
  67. loop do
  68. item = @queue.pop
  69. #print out a bit mask of whether each thread in the pull is currently active/dead/etc.
  70. #sleep, run, aborting, false (terminated normally), nil (terminated with exception)
  71. #look at inspect() of each thread
  72. #bit mask of threads running
  73. puts @queue.size.to_s + ' ' + @threads.collect{|o| o.status.to_s[0..0] }.to_s
  74. #bit mask of threads sleeping
  75. #bit maks of threads dead
  76. #@logger.info 'calling - queue size: ' + @queue.size.to_s
  77. #begin
  78. case item
  79. when Array
  80. item[0].call(*item[1])
  81. # item[0] should be lambda;
  82. # item[1] should be its args.
  83. else
  84. item.call
  85. end
  86. #rescue Exception => e
  87. # puts 'Exception in caller: ' + e.to_s
  88. # puts e.backtrace
  89. #end
  90. end
  91. end
  92. )
  93. end
  94. end
  95. @thread_count+=num
  96. end
  97.  
  98. # Remove threads from the pool
  99.  
  100. def decrement num=1
  101. num=@thread_count if num>@thread_count
  102. num.times do
  103. debug "Dispatching termination command" if @debug
  104. self.dispatch do
  105. @mutex.synchronize do
  106. @threads.delete(Thread.current)
  107. end
  108. debug "Deleting thread #{Thread.current}" if @debug
  109. Thread.current.exit
  110. end
  111. end
  112. @thread_count-=num
  113. end
  114.  
  115. # The thread that calls this will block until
  116. # the threads in @threads have finished.
  117. # These threads will be terminated and the thread
  118. # pool emptied.
  119.  
  120. def join
  121. threads=@threads.dup
  122. # Taking a copy here is really important!
  123. self.decrement @thread_count
  124. # Stop the threads or else suffer a deadlock.
  125. threads.each do |t|
  126. debug "joining thread #{t}" if @debug
  127. t.join
  128. end
  129. end
  130.  
  131. # Dispatch jobs asynchronously.
  132.  
  133. def dispatch func=nil , args=nil , &block
  134. if func.nil?
  135. raise "Must be called with a block or lambda." unless block_given?
  136. else
  137. if args.nil?
  138. @queue << func
  139. else
  140. @queue << [func,args]
  141. end
  142. end
  143. if block_given?
  144. @queue << block
  145. #puts 'thread queue: ' + @queue.size.to_s
  146. end
  147. end
  148.  
  149. end
  150.  
  151. # A Queue that contains its own thread and which
  152. # dispatches jobs synchronously.
  153. #
  154. # Use it like:
  155. #
  156. # sq = SyncQueue.new
  157. # sq.dispatch do
  158. # ... your task ...
  159. # end
  160. #
  161. # Or
  162. #
  163. # sq.dispatch lambda { ... your task ... }
  164. #
  165. # Or
  166. #
  167. # sq.push lambda { ... your task ... }
  168.  
  169. class SyncQueue < Queue
  170.  
  171. def initialize
  172. @processing=false
  173. @stopping=false
  174. @running=false
  175. super
  176. start
  177. end
  178.  
  179. # True if 'stop' has been called but we haven't
  180. # terminated yet.
  181.  
  182. def stopping?
  183. @stopping
  184. end
  185.  
  186. # True if the SyncQueue is no longer
  187. # running. The thread for this queue is
  188. # not in the middle of processing anything.
  189. # The queue should be empty.
  190. # See #terminate .
  191.  
  192. def stopped?
  193. !@running && !@stopping && !@processing
  194. end
  195.  
  196. # Don't process any more jobs but
  197. # the current one; then stop the thread.
  198. # Remaining jobs are removed from the queue
  199. # and returned
  200.  
  201. def terminate
  202. @running=false
  203. @stopping=false
  204. @left=[]
  205. while self.size>0
  206. @left.push self.pop
  207. end
  208. self << lambda{}
  209. # Pass a blank function to unblock
  210. # the thread so it can die.
  211. @left
  212. end
  213.  
  214. # Stop the thread, but allow it to finish
  215. # processing the queue.
  216. # The queue goes into a special state
  217. # where it will throw an error if you try
  218. # to add to the queue.
  219. # The last job will terminate, allowing
  220. # the queue to be added to at a later time.
  221. # SyncQueue#stop is used by SyncQueue#join.
  222.  
  223. def stop
  224. @stopping=true
  225. self << lambda{ self.terminate }
  226. # Pass a terminate function as final
  227. # function on queue. Will unblock thread
  228. # if not doing anything.
  229. end
  230.  
  231. # True if the SyncQueue instance is not terminated
  232. # or in a stopping state.
  233.  
  234. def running?
  235. @running && !@stopping
  236. end
  237.  
  238. # Fires up a new thread to process the queue.
  239. #
  240. # This method is automatically called when you
  241. # instantiate.
  242. #
  243. # Using it to restart an existing SyncQueue instance
  244. # has not been fully tested yet. Currently, it
  245. # will call SyncQueue#join and go into a stopping
  246. # state before starting up a new thread.
  247.  
  248. def start
  249. self.join if @running
  250. @running=true
  251. @thread = Thread.new do
  252. while @running
  253. block=self.pop
  254. @processing=true
  255. block.call
  256. @processing=false
  257. end
  258. end
  259. end
  260.  
  261. # Dispatch jobs synchronously.
  262.  
  263. def dispatch func=nil , &block
  264. if block_given?
  265. self << func unless func.nil?
  266. self << block
  267. else
  268. raise "Must be called with a block." if func.nil?
  269. self << func
  270. end
  271. end
  272.  
  273. # Thread calling this will wait for @thread to
  274. # finish all queued jobs and terminate @thread.
  275.  
  276. def join
  277. self.stop
  278. # Stop the thread or else suffer a deadlock.
  279. @thread.join
  280. end
  281.  
  282. # Push blocks onto the queue.
  283. #
  284. # Raise an error if this queue is in a stopping
  285. # state caused by calling SyncQueue#stop.
  286. # Note that enq and << are aliases for 'push'.
  287.  
  288. def push block
  289. if @stopping
  290. raise "This SyncQueue has been put into a stopping state using ThreadPool::SyncQueue#stop."
  291. end
  292. super
  293. end
  294.  
  295. end
  296.  
  297. end
Add Comment
Please, Sign In to add comment