Guest User

Untitled

a guest
May 29th, 2018
99
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.43 KB | None | 0 0
  1. module AMQP
  2. class ListnerJob < ApplicationJob
  3. queue_as :amqp
  4.  
  5. def perform
  6. Schedulers::AmqpListner.run! # this interaction
  7. end
  8. end
  9. end
  10.  
  11. require 'bunny'
  12. module Schedulers
  13. class AmqpListner < ApplicationInteraction
  14. object :session, default: nil, class: Bunny::Session
  15. set_callback :validate, :after, -> { set_connection }
  16.  
  17. def execute
  18. session.close if session.present?
  19. supervise_amqp_access do
  20. @retries ||= 0
  21. @conn.start
  22. ch = @conn.channel
  23. queue = ch.queue('', exclusive: true)
  24. ch.queue_bind(queue.name, 'unifiedfeed', routing_key: '#')
  25. queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
  26. # This creates job will create AmqpAuditLog object and then process the data of it.
  27. AMQP::FeedJob.perform_later(
  28. delivery_info[:routing_key], properties.headers['timestamp_in_ms'],
  29. payload
  30. )
  31. end
  32. @conn
  33. end
  34. end
  35.  
  36. private
  37.  
  38. def set_connection
  39. STDOUT.sync = true
  40. @conn = Bunny.new(
  41. host: Constants::HOST,
  42. vhost: Constants::VHOST,
  43. port: Constants::PORT,
  44. user: api_key,
  45. password: api_key,
  46. ssl: true,
  47. verify_peer: false,
  48. verify_peer_name: false,
  49. allow_self_signed: true
  50. )
  51. end
  52.  
  53. def api_key
  54. @api_key ||= Figaro.env.amqp_api_key #fetch api from environment
  55. end
  56.  
  57. def supervise_amqp_access
  58. yield
  59. rescue ::StandardError => exception
  60. @conn.close
  61. custom_error_logger(exception)
  62. retry if (@retries += 1) < 3
  63. end
  64.  
  65. def custom_error_logger(exception)
  66. Rails.logger.info "[AMQP Listner Error] : [#{exception.class}] : [#{exception.cause}]"
  67. end
  68. end
  69. end
  70.  
  71. module AMQP
  72. class WatcherJob < ApplicationJob
  73. queue_as :amqp_watcher
  74. after_perform :enqueue_watcher
  75.  
  76. def perform
  77. return if AmqpAuditLog.where(created_at: 1.minute.ago..Time.zone.now).present?
  78. system "ps -ef | grep amqp_listner | grep -v grep | awk '{print $2}' | xargs kill -9"
  79. AMQP::ListnerJob.perform_later
  80. # TODO : CHANGE this with log file
  81. system 'bundle exec sidekiq -d -q amqp --tag amqp_listner -c 2'
  82. Sidekiq::Queue.all.select { |a| a.name == 'amqp_watcher' }.first.clear
  83. end
  84.  
  85. def enqueue_watcher
  86. AMQP::WatcherJob.set(wait_until: 60.seconds).perform_later
  87. end
  88. end
  89. end
Add Comment
Please, Sign In to add comment