SHARE
TWEET

Untitled

a guest Sep 17th, 2019 78 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. <?php
  2.  
  3. namespace App\Library\AMQP;
  4.  
  5. use Interop\Queue\Queue;
  6. use Interop\Queue\Message;
  7. use Interop\Queue\Processor;
  8. use Interop\Amqp\Impl\AmqpQueue;
  9. use Enqueue\Consumption\QueueConsumer;
  10. use Enqueue\Consumption\ChainExtension;
  11. use App\Events\AMQP\NewMessageReceived;
  12. use Enqueue\AmqpBunny\AmqpConnectionFactory;
  13. use Enqueue\Consumption\Extension\SignalExtension;
  14.  
  15. class Amqp
  16. {
  17.     /** @var \Interop\Queue\Context */
  18.     protected $context;
  19.  
  20.     public function __construct(array $config)
  21.     {
  22.         $connectionFactory = new AmqpConnectionFactory($config);
  23.         $this->context = $connectionFactory->createContext();
  24.     }
  25.  
  26.     public function startConsumer(array $queues): void
  27.     {
  28.         $consumer = new QueueConsumer(
  29.             $this->context,
  30.             new ChainExtension([new SignalExtension()])
  31.         );
  32.  
  33.         foreach ($queues as $queue) {
  34.             $consumer->bindCallback($queue, function (Message $message) use ($queue) {
  35.                 event(new NewMessageReceived($queue, $message->getBody()));
  36.  
  37.                 return Processor::ACK;
  38.             });
  39.         }
  40.  
  41.         $consumer->consume();
  42.     }
  43.  
  44.     public function publishMessage(string $queue, string $message, int $delay = 0): void
  45.     {
  46.         $destination = $this->declareQueue($queue);
  47.  
  48.         $amqpMessage = $this->context->createMessage($message);
  49.  
  50.         $producer = $this->context->createProducer();
  51.  
  52.         if ($delay > 0) {
  53.             $producer->setDeliveryDelay($delay);
  54.         }
  55.  
  56.         $producer->send($destination, $amqpMessage);
  57.     }
  58.  
  59.     public function receiveMessage(string $queue): void
  60.     {
  61.         $destination = $this->declareQueue($queue);
  62.  
  63.         $consumer = $this->context->createConsumer($destination);
  64.  
  65.         if ($message = $consumer->receive(1000)) {
  66.             event(new NewMessageReceived($queue, $message->getBody()));
  67.  
  68.             $consumer->acknowledge($message);
  69.         }
  70.     }
  71.  
  72.     protected function declareQueue($queue = null): Queue
  73.     {
  74.         $destination = $this->context->createQueue($queue);
  75.  
  76.         $destination->addFlag(AmqpQueue::FLAG_DURABLE);
  77.  
  78.         $this->context->declareQueue($destination);
  79.  
  80.         return $destination;
  81.     }
  82. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top