Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- require "rubygems"
- require "amqp" # requires version >= 0.8.0.RC14
- puts
- EM.run do
- puts "#"*20
- puts "AMQP.start block runs"
- def connection(&block)
- options = {
- :user => 'guest',
- :pass => 'guest',
- :vhost => '/',
- :logging => false,
- :insist => false,
- # :timeout => 10.3,
- :host => 'localhost',
- :port => 5672,
- }
- @session ||= AMQP.connect(options, &block)
- if @session && !@session.connected?
- @session.reconnect_to(options, 2)
- end
- connection_setup(@session)
- end
- def connection_setup(session)
- # on_open, on_closedに渡されたブロックは、何度再接続をしても最初の一度だけしか呼び出されないが、
- # after_recovery(on_recovery)に渡されたブロックは、再接続の度に呼び出されます。
- # connection.on_open do |*args|
- # puts "on_open: " << args.inspect
- # end
- # connection.on_closed do |*args|
- # puts "on_closed: " << args.inspect
- # end
- session.after_recovery do |conn, settings|
- puts "after_recovery: " << settings.inspect
- channel(true)
- exchange(true)
- puts "#channel.open? = #{channel.open?}"
- puts "#connection.connected? = #{connection.connected?}"
- end
- session.on_error do |conn, connection_close|
- puts "connection.on_error channel_close.reply_text. channel_close: " << connection_close.inspect
- puts "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
- # if connection_close.reply_code == 320
- # puts "[connection.close] Setting up a periodic reconnection timer..."
- # # every 30 seconds
- # conn.periodically_reconnect(30)
- # end
- # puts "raising channel_close.reply_text. channel_close: " << connection_close.inspect
- # raise connection_close.reply_text
- end
- session.on_tcp_connection_loss do |conn, settings|
- puts "[network failure] Trying to reconnect..."
- conn.reconnect(false, 1)
- end
- session.on_tcp_connection_failure do |conn, settings|
- puts("[warn] connection.on_tcp_connection_failure: now reconnecting.")
- end
- session
- end
- def channel(force = false)
- options = {
- :prefetch => 1,
- :auto_recovery => true,
- }
- @channel ||= AMQP::Channel.new(connection, options)
- @channel = AMQP::Channel.new(connection, options) if force || !@channel.open?
- @channel.on_error do |ch, channel_close|
- puts "raising channel_close.reply_text. channel_close: " << channel_close.inspect
- end
- @channel
- end
- def exchange(force = false)
- options = {
- :passive => false,
- :durable => true,
- :auto_delete => false,
- :internal => false,
- :nowait => true,
- }
- @exchange ||= AMQP::Exchange.new(channel, "direct", "exchange_totty", options)
- @exchange = AMQP::Exchange.new(channel, "direct", "exchange_totty", options) if force
- @exchange
- end
- def queue(force = false)
- options = {
- :durable => true
- }
- @queue ||= AMQP::Queue.new(channel, "queue_totty", options)
- @queue = AMQP::Queue.new(channel, "queue_totty", options) if force
- @queue.bind(exchange)
- @queue
- end
- @count = 0
- def fire
- EM.next_tick do
- @success = false
- puts
- puts "== fire =="
- puts " [[start!!]]"
- puts " [[connection.connected? = #{connection.connected?}], [channel.open? = #{channel.open?}]]"
- # connection.close if @count == 0
- # @count = 1
- begin
- puts
- sleep 5
- puts "** publish"
- options = {
- :mandatory => true,
- :persistent => true,
- }
- exchange.publish("hoge", options) do
- puts "@ publish "*5
- @success = true
- connection.disconnect{ puts "Disconnected. Exiting"; EM.stop }
- end
- EM.add_timer(2) {
- if @success
- connection.close { EM.stop }
- else
- fire
- end
- }
- rescue => e
- puts "@ raise "*5
- puts "raise => #{e.inspect}"
- retry
- end
- puts "fin.."
- end
- end
- def process
- puts "== process =="
- puts " [[start!!]]"
- queue.subscribe(:ack => true, :nowait => true) do |metadata, payload|
- puts "@ subscribe "*5
- sleep 5 # このタイミングで stop_app , reconnect が行われる、start_app したら、ちゃんと受け取れた
- puts "** subscribe"
- metadata.ack
- end
- puts "fin.."
- end
- #############
- # publish
- #############
- fire
- puts "fire end"
- #############
- # subscribe
- #############
- # process
- # puts "process end"
- show_stopper = Proc.new {
- puts "# show_stopper"
- connection.disconnect{ puts "Disconnected. Exiting"; EventMachine.stop }
- puts "#"*20
- }
- Signal.trap "TERM", show_stopper
- Signal.trap "INT", show_stopper
- # EM.add_timer(20, show_stopper)
- puts "EOF"
- end
Add Comment
Please, Sign In to add comment