Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'thread'
- require 'monitor'
- require 'singleton'
- module MirrorConcurrentUtil
- THREAD_COUNT = 10
- QUEUE_SIZE = 20
- class MirrorThreadPool
- include Singleton
- include MonitorMixin
- attr_accessor :thread_pool, :work_queue, :thread_available
- def initialize
- @thread_pool = Array.new(MirrorConcurrentUtil::THREAD_COUNT)
- @work_queue = SizedQueue.new(MirrorConcurrentUtil::QUEUE_SIZE)
- @threads_available = self.new_cond
- end
- end
- class MirrorProducer
- def self.run(jobs) do
- thread_pool = MirrorThreadPool.instance.thread_pool
- threads_available = MirrorThreadPool.instance.threads_available
- work_queue = MirrorThreadPool.instance.work_queue
- Thread.new do
- jobs.each do |job|
- work_queue << job
- thread_pool.synchronize do
- threads_available.signal
- end
- end
- end
- end
- end
- class MirrorConsumer
- def self.run()
- thread_pool = MirrorThreadPool.instance.thread_pool
- threads_available = MirrorThreadPool.instance.threads_available
- work_queue = MirrorThreadPool.instance.work_queue
- Thread.new do
- loop do
- break if sysexit && MirrorThreadPool.instance.work_queue == 0
- available_thread_index = nil
- thread_pool.synchronize do
- threads_available.wait_while do
- thread_pool.select { |thread| thread.nil? || thread.status == false ||
- thread["finished"].nil? == false }.length == 0
- end
- available_thread_index = thread_pool.rindex { |thread| thread.nil? || thread.status == false ||
- thread["finished"].nil? == false }
- end
- job = work_queue.pop
- thread_pool[available_thread_index] = Thread.new(job) do
- #Register rsync start time/date
- #Run rsync(job)
- #register rsync end time/date
- Thread.current["finished"] = true
- thread_pool.synchronize do
- threads_available.signal
- end
- end
- end
- end
- end
- end
Add Comment
Please, Sign In to add comment