Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module AMQP
- class ListnerJob < ApplicationJob
- queue_as :amqp
- def perform
- Schedulers::AmqpListner.run! # this interaction
- end
- end
- end
- require 'bunny'
- module Schedulers
- class AmqpListner < ApplicationInteraction
- object :session, default: nil, class: Bunny::Session
- set_callback :validate, :after, -> { set_connection }
- def execute
- session.close if session.present?
- supervise_amqp_access do
- @retries ||= 0
- @conn.start
- ch = @conn.channel
- queue = ch.queue('', exclusive: true)
- ch.queue_bind(queue.name, 'unifiedfeed', routing_key: '#')
- queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
- # This creates job will create AmqpAuditLog object and then process the data of it.
- AMQP::FeedJob.perform_later(
- delivery_info[:routing_key], properties.headers['timestamp_in_ms'],
- payload
- )
- end
- @conn
- end
- end
- private
- def set_connection
- STDOUT.sync = true
- @conn = Bunny.new(
- host: Constants::HOST,
- vhost: Constants::VHOST,
- port: Constants::PORT,
- user: api_key,
- password: api_key,
- ssl: true,
- verify_peer: false,
- verify_peer_name: false,
- allow_self_signed: true
- )
- end
- def api_key
- @api_key ||= Figaro.env.amqp_api_key #fetch api from environment
- end
- def supervise_amqp_access
- yield
- rescue ::StandardError => exception
- @conn.close
- custom_error_logger(exception)
- retry if (@retries += 1) < 3
- end
- def custom_error_logger(exception)
- Rails.logger.info "[AMQP Listner Error] : [#{exception.class}] : [#{exception.cause}]"
- end
- end
- end
- module AMQP
- class WatcherJob < ApplicationJob
- queue_as :amqp_watcher
- after_perform :enqueue_watcher
- def perform
- return if AmqpAuditLog.where(created_at: 1.minute.ago..Time.zone.now).present?
- system "ps -ef | grep amqp_listner | grep -v grep | awk '{print $2}' | xargs kill -9"
- AMQP::ListnerJob.perform_later
- # TODO : CHANGE this with log file
- system 'bundle exec sidekiq -d -q amqp --tag amqp_listner -c 2'
- Sidekiq::Queue.all.select { |a| a.name == 'amqp_watcher' }.first.clear
- end
- def enqueue_watcher
- AMQP::WatcherJob.set(wait_until: 60.seconds).perform_later
- end
- end
- end
Add Comment
Please, Sign In to add comment