Guest User

Untitled

a guest
Oct 8th, 2018
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.15 KB | None | 0 0
  1. # an initializer
  2. # gem install bunny
  3. # written by: Tam Nguyen (twitter: @nguyentamvn)
  4.  
  5. require 'bunny'
  6. require 'json'
  7.  
  8. class RabbitPublisher
  9. def initialize(options = {})
  10. bunny_config = {
  11. host: options[:host] || 'localhost',
  12. user: options[:user],
  13. pass: options[:pass],
  14. heartbeat_interval: 8,
  15. automatically_recover: true,
  16. timeout: 7
  17. }
  18. rabbit_connection = Bunny.new(bunny_config)
  19. rabbit_connection.start
  20. rabbit_channel = rabbit_connection.create_channel
  21.  
  22. self.channel = rabbit_channel
  23. self.connection = rabbit_connection
  24. end
  25.  
  26. def publish(options = {})
  27. queue_name = options[:queue_name]
  28. message = options[:message]
  29. if queue_name.nil? || queue_name.empty?
  30. puts 'queue_name is missing'
  31. return
  32. end
  33.  
  34. queue = channel.queue(queue_name, durable: true)
  35. begin
  36. puts "Sending message..."
  37. queue.publish(message.to_json, persistent: true)
  38. rescue Exception => e
  39. puts "[ERROR] #{e}"
  40. close
  41. end
  42. end
  43.  
  44. def receive(options = {})
  45. queue_name = options[:queue_name]
  46. if queue_name.nil? || queue_name.empty?
  47. puts 'queue_name is missing'
  48. return
  49. end
  50.  
  51. puts "Waiting queue: #{queue_name}..."
  52. channel.prefetch(1)
  53. begin
  54. queue = channel.queue(queue_name, durable: true)
  55. queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
  56. yield channel, delivery_info, body
  57. end
  58. rescue Exception => e
  59. puts "[ERROR] #{e}"
  60. close
  61. end
  62. end
  63.  
  64. def close
  65. channel.close
  66. connection.close
  67. puts 'Closed rabbitmq connections.'
  68. end
  69.  
  70. private
  71.  
  72. attr_accessor :channel
  73. attr_accessor :connection
  74. end
  75.  
  76. # how to use
  77. # rabbit = RabbitPublisher.new
  78.  
  79. # for publishing messages
  80. # rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen" })
  81. # (1..10000).each {|i| rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen #{i}" }); sleep 0.1 }
  82.  
  83. # for subscribing messages
  84. # rabbit.receive(queue_name: 'twitter') do |channel, delivery_info, body|
  85. # puts " [x] #{body}"
  86. # channel.ack(delivery_info.delivery_tag)
  87. # end
  88.  
  89. # should close connecttions whern completed tasks
  90. # rabbit.close
Add Comment
Please, Sign In to add comment