Guest User

Untitled

a guest
Apr 23rd, 2018
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.12 KB | None | 0 0
  1. require 'thread'
  2. require 'monitor'
  3. require 'singleton'
  4.  
  5. module MirrorConcurrentUtil
  6. THREAD_COUNT = 10
  7. QUEUE_SIZE = 20
  8.  
  9. class MirrorThreadPool
  10. include Singleton
  11. include MonitorMixin
  12.  
  13. attr_accessor :thread_pool, :work_queue, :thread_available
  14.  
  15. def initialize
  16. @thread_pool = Array.new(MirrorConcurrentUtil::THREAD_COUNT)
  17. @work_queue = SizedQueue.new(MirrorConcurrentUtil::QUEUE_SIZE)
  18. @threads_available = self.new_cond
  19. end
  20.  
  21. end
  22.  
  23. class MirrorProducer
  24. def self.run(jobs) do
  25. thread_pool = MirrorThreadPool.instance.thread_pool
  26. threads_available = MirrorThreadPool.instance.threads_available
  27. work_queue = MirrorThreadPool.instance.work_queue
  28.  
  29. Thread.new do
  30. jobs.each do |job|
  31. work_queue << job
  32.  
  33. thread_pool.synchronize do
  34. threads_available.signal
  35. end
  36. end
  37. end
  38. end
  39. end
  40.  
  41. class MirrorConsumer
  42. def self.run()
  43. thread_pool = MirrorThreadPool.instance.thread_pool
  44. threads_available = MirrorThreadPool.instance.threads_available
  45. work_queue = MirrorThreadPool.instance.work_queue
  46.  
  47. Thread.new do
  48. loop do
  49. break if sysexit && MirrorThreadPool.instance.work_queue == 0
  50. available_thread_index = nil
  51.  
  52. thread_pool.synchronize do
  53. threads_available.wait_while do
  54. thread_pool.select { |thread| thread.nil? || thread.status == false ||
  55. thread["finished"].nil? == false }.length == 0
  56. end
  57.  
  58. available_thread_index = thread_pool.rindex { |thread| thread.nil? || thread.status == false ||
  59. thread["finished"].nil? == false }
  60. end
  61.  
  62. job = work_queue.pop
  63.  
  64. thread_pool[available_thread_index] = Thread.new(job) do
  65. #Register rsync start time/date
  66. #Run rsync(job)
  67. #register rsync end time/date
  68. Thread.current["finished"] = true
  69.  
  70. thread_pool.synchronize do
  71. threads_available.signal
  72. end
  73. end
  74. end
  75. end
  76. end
  77. end
Add Comment
Please, Sign In to add comment