Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # an initializer
- # gem install bunny
- # written by: Tam Nguyen (twitter: @nguyentamvn)
- require 'bunny'
- require 'json'
- class RabbitPublisher
- def initialize(options = {})
- bunny_config = {
- host: options[:host] || 'localhost',
- user: options[:user],
- pass: options[:pass],
- heartbeat_interval: 8,
- automatically_recover: true,
- timeout: 7
- }
- rabbit_connection = Bunny.new(bunny_config)
- rabbit_connection.start
- rabbit_channel = rabbit_connection.create_channel
- self.channel = rabbit_channel
- self.connection = rabbit_connection
- end
- def publish(options = {})
- queue_name = options[:queue_name]
- message = options[:message]
- if queue_name.nil? || queue_name.empty?
- puts 'queue_name is missing'
- return
- end
- queue = channel.queue(queue_name, durable: true)
- begin
- puts "Sending message..."
- queue.publish(message.to_json, persistent: true)
- rescue Exception => e
- puts "[ERROR] #{e}"
- close
- end
- end
- def receive(options = {})
- queue_name = options[:queue_name]
- if queue_name.nil? || queue_name.empty?
- puts 'queue_name is missing'
- return
- end
- puts "Waiting queue: #{queue_name}..."
- channel.prefetch(1)
- begin
- queue = channel.queue(queue_name, durable: true)
- queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
- yield channel, delivery_info, body
- end
- rescue Exception => e
- puts "[ERROR] #{e}"
- close
- end
- end
- def close
- channel.close
- connection.close
- puts 'Closed rabbitmq connections.'
- end
- private
- attr_accessor :channel
- attr_accessor :connection
- end
- # how to use
- # rabbit = RabbitPublisher.new
- # for publishing messages
- # rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen" })
- # (1..10000).each {|i| rabbit.publish(queue_name: "twitter", message: { name: "tam nguyen #{i}" }); sleep 0.1 }
- # for subscribing messages
- # rabbit.receive(queue_name: 'twitter') do |channel, delivery_info, body|
- # puts " [x] #{body}"
- # channel.ack(delivery_info.delivery_tag)
- # end
- # should close connecttions whern completed tasks
- # rabbit.close
Add Comment
Please, Sign In to add comment