Guest User

Untitled

a guest
Apr 30th, 2018
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.39 KB | None | 0 0
  1. begin
  2. require 'bunny'
  3. rescue LoadError
  4. puts "Can't load bunny gem - all RabbitMQ bunny-based loops will be disabled!"
  5. end
  6.  
  7. require 'timeout'
  8.  
  9. # Configuration options (all)
  10. # Queue connect to (optional, loop's name will be used by default)
  11. # queue_name: sheeps
  12. # Does we need to ack each message?
  13. # queue_ack: true
  14. # Exchange (optional, 'amq.default' will be used by default)
  15. # exchange: 'amq.default'
  16. # Key used to filter messages
  17. # key: 'bunny.*'
  18. # Sleep for N seconds if there are no messages
  19. # noop_sleep: 5
  20. # Quit after N times if there are no messages found (disabled by default)
  21. # max_noop_requests: 10
  22. # Timeout execution (disabled by default)
  23. # action_timeout: 10
  24. # Max execution times (disabled by default)
  25. # max_requests: 5
  26. # Conection data (optional)
  27. # connection:
  28. # host: localhost
  29. # user: guest
  30. # pass: guest
  31. # port: 5672
  32. # vhost: '/'
  33. # spec: '08'
  34.  
  35. class Loops::AMQP::Bunny < Loops::Base
  36. def self.check_dependencies
  37. raise "No bunny gem installed!" unless defined?(::Bunny)
  38. end
  39.  
  40. def run
  41. create_client
  42. create_queue
  43.  
  44. total_served = 0
  45. noops = 0
  46. begin
  47. loop do
  48. msg = @queue.pop(:ack => !!config['queue_ack'])[:payload]
  49. if msg == :queue_empty
  50. disconnect_client_and_exit if config['max_noop_requests'] && noops >= config['max_noop_requests'].to_i
  51.  
  52. config['noop_sleep'] ||= 5
  53. sleep config['noop_sleep']
  54. noops += 1
  55. next
  56. end
  57. begin
  58. if config['action_timeout']
  59. timeout(config['action_timeout']) { process_message(msg) }
  60. else
  61. process_message(msg)
  62. end
  63. rescue StandardError => e
  64. error "Exception from process message! We won't be ACKing the message."
  65. error "Details: #{e} at #{e.backtrace.first}"
  66. disconnect_client_and_exit
  67. end
  68.  
  69. @queue.ack if config['queue_ack']
  70. total_served += 1
  71. if config['max_requests'] && total_served >= config['max_requests'].to_i
  72. disconnect_client_and_exit
  73. end
  74. end
  75. rescue StandardError => e
  76. error "Closing queue connection because of exception: #{e} at #{e.backtrace.first}"
  77. disconnect_client_and_exit
  78. end
  79.  
  80. end
  81.  
  82.  
  83. def process_message(msg)
  84. raise "This method process_message(msg) should be overriden in the loop class!"
  85. end
  86.  
  87. private
  88.  
  89. def create_client
  90. @connection = ::Bunny.new(symbolize_keys(config['connection']))
  91. @connection.start
  92. setup_signals
  93. end
  94.  
  95. def create_queue
  96. config['queue_name'] ||= "#{name}"
  97. error "Subscribing for the queue #{config['queue_name']}..."
  98.  
  99. @queue = @connection.queue config['queue_name'], :durable => true
  100. @exchange = @connection.exchange config['exchange'], :type => :topic, :durable => true
  101. config['key'] ||= config['queue_name']
  102. @queue.bind(@exchange, :key => config['key'])
  103. end
  104.  
  105. def disconnect_client_and_exit
  106. debug "Stopping..."
  107. @connection.stop rescue nil
  108. exit(0)
  109. end
  110.  
  111. def setup_signals
  112. Signal.trap('INT') { disconnect_client_and_exit }
  113. Signal.trap('TERM') { disconnect_client_and_exit }
  114. end
  115.  
  116. def symbolize_keys(hash)
  117. return hash unless hash.kind_of? Hash
  118. if hash.respond_to? :symbolize_keys
  119. hash.symbolize_keys
  120. else
  121. hash.inject({}) do |options, (key, value)|
  122. options[(key.to_sym rescue key) || key] = value
  123. options
  124. end
  125. end
  126. end
  127.  
  128. end
Add Comment
Please, Sign In to add comment