Guest User

Untitled

a guest
Jan 14th, 2018
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.85 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. require "rubygems"
  3. require "amqp" # requires version >= 0.8.0.RC14
  4.  
  5. puts
  6. EM.run do
  7. puts "#"*20
  8. puts "AMQP.start block runs"
  9.  
  10. def connection(&block)
  11. options = {
  12. :user => 'guest',
  13. :pass => 'guest',
  14. :vhost => '/',
  15. :logging => false,
  16. :insist => false,
  17. # :timeout => 10.3,
  18. :host => 'localhost',
  19. :port => 5672,
  20. }
  21. @session ||= AMQP.connect(options, &block)
  22. if @session && !@session.connected?
  23. @session.reconnect_to(options, 2)
  24. end
  25. connection_setup(@session)
  26. end
  27.  
  28. def connection_setup(session)
  29. # on_open, on_closedに渡されたブロックは、何度再接続をしても最初の一度だけしか呼び出されないが、
  30. # after_recovery(on_recovery)に渡されたブロックは、再接続の度に呼び出されます。
  31. # connection.on_open do |*args|
  32. # puts "on_open: " << args.inspect
  33. # end
  34. # connection.on_closed do |*args|
  35. # puts "on_closed: " << args.inspect
  36. # end
  37. session.after_recovery do |conn, settings|
  38. puts "after_recovery: " << settings.inspect
  39. channel(true)
  40. exchange(true)
  41. puts "#channel.open? = #{channel.open?}"
  42. puts "#connection.connected? = #{connection.connected?}"
  43. end
  44. session.on_error do |conn, connection_close|
  45. puts "connection.on_error channel_close.reply_text. channel_close: " << connection_close.inspect
  46. puts "[connection.close] Reply code = #{connection_close.reply_code}, reply text = #{connection_close.reply_text}"
  47. # if connection_close.reply_code == 320
  48. # puts "[connection.close] Setting up a periodic reconnection timer..."
  49. # # every 30 seconds
  50. # conn.periodically_reconnect(30)
  51. # end
  52. # puts "raising channel_close.reply_text. channel_close: " << connection_close.inspect
  53. # raise connection_close.reply_text
  54. end
  55. session.on_tcp_connection_loss do |conn, settings|
  56. puts "[network failure] Trying to reconnect..."
  57. conn.reconnect(false, 1)
  58. end
  59. session.on_tcp_connection_failure do |conn, settings|
  60. puts("[warn] connection.on_tcp_connection_failure: now reconnecting.")
  61. end
  62. session
  63. end
  64.  
  65. def channel(force = false)
  66. options = {
  67. :prefetch => 1,
  68. :auto_recovery => true,
  69. }
  70. @channel ||= AMQP::Channel.new(connection, options)
  71. @channel = AMQP::Channel.new(connection, options) if force || !@channel.open?
  72. @channel.on_error do |ch, channel_close|
  73. puts "raising channel_close.reply_text. channel_close: " << channel_close.inspect
  74. end
  75. @channel
  76. end
  77.  
  78. def exchange(force = false)
  79. options = {
  80. :passive => false,
  81. :durable => true,
  82. :auto_delete => false,
  83. :internal => false,
  84. :nowait => true,
  85. }
  86. @exchange ||= AMQP::Exchange.new(channel, "direct", "exchange_totty", options)
  87. @exchange = AMQP::Exchange.new(channel, "direct", "exchange_totty", options) if force
  88. @exchange
  89. end
  90.  
  91. def queue(force = false)
  92. options = {
  93. :durable => true
  94. }
  95. @queue ||= AMQP::Queue.new(channel, "queue_totty", options)
  96. @queue = AMQP::Queue.new(channel, "queue_totty", options) if force
  97. @queue.bind(exchange)
  98. @queue
  99. end
  100.  
  101. @count = 0
  102.  
  103. def fire
  104. EM.next_tick do
  105. @success = false
  106. puts
  107. puts "== fire =="
  108. puts " [[start!!]]"
  109. puts " [[connection.connected? = #{connection.connected?}], [channel.open? = #{channel.open?}]]"
  110. # connection.close if @count == 0
  111. # @count = 1
  112.  
  113. begin
  114. puts
  115. sleep 5
  116. puts "** publish"
  117. options = {
  118. :mandatory => true,
  119. :persistent => true,
  120. }
  121. exchange.publish("hoge", options) do
  122. puts "@ publish "*5
  123. @success = true
  124.  
  125. connection.disconnect{ puts "Disconnected. Exiting"; EM.stop }
  126. end
  127.  
  128. EM.add_timer(2) {
  129. if @success
  130. connection.close { EM.stop }
  131. else
  132. fire
  133. end
  134. }
  135. rescue => e
  136. puts "@ raise "*5
  137. puts "raise => #{e.inspect}"
  138. retry
  139. end
  140. puts "fin.."
  141. end
  142. end
  143.  
  144. def process
  145. puts "== process =="
  146. puts " [[start!!]]"
  147. queue.subscribe(:ack => true, :nowait => true) do |metadata, payload|
  148. puts "@ subscribe "*5
  149. sleep 5 # このタイミングで stop_app , reconnect が行われる、start_app したら、ちゃんと受け取れた
  150. puts "** subscribe"
  151. metadata.ack
  152. end
  153. puts "fin.."
  154. end
  155.  
  156. #############
  157. # publish
  158. #############
  159. fire
  160. puts "fire end"
  161.  
  162. #############
  163. # subscribe
  164. #############
  165. # process
  166. # puts "process end"
  167.  
  168. show_stopper = Proc.new {
  169. puts "# show_stopper"
  170. connection.disconnect{ puts "Disconnected. Exiting"; EventMachine.stop }
  171. puts "#"*20
  172. }
  173.  
  174. Signal.trap "TERM", show_stopper
  175. Signal.trap "INT", show_stopper
  176. # EM.add_timer(20, show_stopper)
  177.  
  178.  
  179. puts "EOF"
  180. end
Add Comment
Please, Sign In to add comment