Advertisement
Guest User

Untitled

a guest
Oct 5th, 2019
166
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Ruby 1.31 KB | None | 0 0
  1. require "bunny"
  2. require "json"
  3. require "etc"
  4.  
  5. Channel = Concurrent::Channel
  6.  
  7. def worker(id, jobs, results)
  8.     jobs.each do |j|
  9.         puts "here"
  10.         connection = Bunny.new(:hostname => 'rabbitmq',
  11.             :automatically_recover => true,
  12.             :heartbeat_interval => 5,
  13.             :user => 'test',
  14.             :pass => 'test')
  15.         connection.start
  16.    
  17.         channel = connection.create_channel
  18.         channel.prefetch(1)
  19.         queue = channel.queue('hello')
  20.    
  21.         begin
  22.             running = queue.subscribe(block: true, manual_ack: true) do |_delivery_info, _properties, body|
  23.                 if JSON.parse(body).key?("message")
  24.                     puts "got a message: #{ track }"
  25.                     channel.acknowledge(_delivery_info.delivery_tag, false)
  26.                 end                  
  27.             end
  28.    
  29.         rescue Exception => aa
  30.             connection.close
  31.             puts "failed"
  32.             fail aa
  33.         end
  34.     end
  35. end
  36.  
  37. jobs    = Channel.new(buffer: :buffered, capacity: 3)
  38. results = Channel.new(buffer: :buffered, capacity: 3)
  39.  
  40. (1..Etc.nprocessors * 4).each do |w|
  41.   Channel.go { worker(w, jobs, results) }
  42. end
  43.  
  44. (1..Etc.nprocessors * 4).each do |j|
  45.   jobs << j
  46. end
  47. jobs.close
  48.  
  49. (1..Etc.nprocessors * 4).each do
  50.   ~results
  51. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement