Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require "bunny"
- require "json"
- require "etc"
- Channel = Concurrent::Channel
- def worker(id, jobs, results)
- jobs.each do |j|
- puts "here"
- connection = Bunny.new(:hostname => 'rabbitmq',
- :automatically_recover => true,
- :heartbeat_interval => 5,
- :user => 'test',
- :pass => 'test')
- connection.start
- channel = connection.create_channel
- channel.prefetch(1)
- queue = channel.queue('hello')
- begin
- running = queue.subscribe(block: true, manual_ack: true) do |_delivery_info, _properties, body|
- if JSON.parse(body).key?("message")
- puts "got a message: #{ track }"
- channel.acknowledge(_delivery_info.delivery_tag, false)
- end
- end
- rescue Exception => aa
- connection.close
- puts "failed"
- fail aa
- end
- end
- end
- jobs = Channel.new(buffer: :buffered, capacity: 3)
- results = Channel.new(buffer: :buffered, capacity: 3)
- (1..Etc.nprocessors * 4).each do |w|
- Channel.go { worker(w, jobs, results) }
- end
- (1..Etc.nprocessors * 4).each do |j|
- jobs << j
- end
- jobs.close
- (1..Etc.nprocessors * 4).each do
- ~results
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement