Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- require 'sneakers'
- module Sneakers
- module DelayedJobSupport
- def enqueue_at(job, timestamp)
- delay = timestamp - Time.current.to_f
- # Just enqueue job if delay is zero or negative
- return enqueue(job) if delay < 0
- # Ensure queue is bound to the delayed message exchange
- self.class.ensure_delayed_exchange_bound(job.queue_name)
- # Publish on the delayed message exchange
- self.class.delayed_publisher.publish(
- ActiveSupport::JSON.encode(job.serialize),
- headers: { 'x-delay' => (delay.to_f * 1000).to_i },
- routing_key: job.queue_name)
- end
- module ClassMethods
- def delayed_publisher
- @delayed_publisher ||= Sneakers::Publisher.new({
- exchange: 'backend.delayed',
- exchange_options: {
- type: 'x-delayed-message',
- arguments: { 'x-delayed-type' => 'direct' },
- durable: true,
- auto_delete: false
- }
- })
- end
- # The first time a queue receives a delayed job, make sure
- # that the queue is bound to the delayed message exchange
- def ensure_delayed_exchange_bound(queue_name)
- @bound_to_delayed_exchange ||= {}
- return nil if @bound_to_delayed_exchange[queue_name].present?
- delayed_publisher.ensure_connection!
- queue = delayed_publisher.channel.queue(queue_name, Sneakers::CONFIG[:queue_options].merge( {arguments: { 'x-dead-letter-exchange' => 'nobi.dlx' } }))
- queue.bind(delayed_publisher.exchange, routing_key: queue_name)
- @bound_to_delayed_exchange[queue_name] = true
- end
- end
- end
- end
- module ActiveJob
- module QueueAdapters
- class SneakersAdapter
- # Add support for delayed jobs to SneakersAdapter
- extend Sneakers::DelayedJobSupport::ClassMethods
- prepend Sneakers::DelayedJobSupport
- end
- end
- end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement