Advertisement
Guest User

Untitled

a guest
Dec 17th, 2019
115
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Ruby 2.27 KB | None | 0 0
  1. require 'sneakers'
  2.  
  3. module Sneakers
  4.   module DelayedJobSupport
  5.     def enqueue_at(job, timestamp)
  6.       delay = timestamp - Time.current.to_f
  7.       # Just enqueue job if delay is zero or negative
  8.       return enqueue(job) if delay < 0
  9.  
  10.       # Ensure queue is bound to the delayed message exchange
  11.       self.class.ensure_delayed_exchange_bound(job.queue_name)
  12.  
  13.       # Publish on the delayed message exchange
  14.       self.class.delayed_publisher.publish(
  15.           ActiveSupport::JSON.encode(job.serialize),
  16.           headers: { 'x-delay' => (delay.to_f * 1000).to_i },
  17.           routing_key: job.queue_name)
  18.     end
  19.  
  20.     module ClassMethods
  21.       def delayed_publisher
  22.         @delayed_publisher ||= Sneakers::Publisher.new({
  23.                                                            exchange: 'backend.delayed',
  24.                                                            exchange_options: {
  25.                                                                type: 'x-delayed-message',
  26.                                                                arguments: { 'x-delayed-type' => 'direct' },
  27.                                                                durable: true,
  28.                                                                auto_delete: false
  29.                                                            }
  30.                                                        })
  31.       end
  32.  
  33.       # The first time a queue receives a delayed job, make sure
  34.       # that the queue is bound to the delayed message exchange
  35.       def ensure_delayed_exchange_bound(queue_name)
  36.         @bound_to_delayed_exchange ||= {}
  37.         return nil if @bound_to_delayed_exchange[queue_name].present?
  38.         delayed_publisher.ensure_connection!
  39.         queue = delayed_publisher.channel.queue(queue_name, Sneakers::CONFIG[:queue_options].merge( {arguments: { 'x-dead-letter-exchange' => 'nobi.dlx' } }))
  40.         queue.bind(delayed_publisher.exchange, routing_key: queue_name)
  41.         @bound_to_delayed_exchange[queue_name] = true
  42.       end
  43.     end
  44.   end
  45. end
  46.  
  47. module ActiveJob
  48.   module QueueAdapters
  49.     class SneakersAdapter
  50.       # Add support for delayed jobs to SneakersAdapter
  51.       extend Sneakers::DelayedJobSupport::ClassMethods
  52.       prepend Sneakers::DelayedJobSupport
  53.     end
  54.   end
  55. end
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement