Guest User

Untitled

a guest
Sep 17th, 2019
91
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. <?php
  2.  
  3. namespace App\Library\AMQP;
  4.  
  5. use Interop\Queue\Queue;
  6. use Interop\Queue\Message;
  7. use Interop\Queue\Processor;
  8. use Interop\Amqp\Impl\AmqpQueue;
  9. use Enqueue\Consumption\QueueConsumer;
  10. use Enqueue\Consumption\ChainExtension;
  11. use App\Events\AMQP\NewMessageReceived;
  12. use Enqueue\AmqpBunny\AmqpConnectionFactory;
  13. use Enqueue\Consumption\Extension\SignalExtension;
  14.  
  15. class Amqp
  16. {
  17. /** @var \Interop\Queue\Context */
  18. protected $context;
  19.  
  20. public function __construct(array $config)
  21. {
  22. $connectionFactory = new AmqpConnectionFactory($config);
  23. $this->context = $connectionFactory->createContext();
  24. }
  25.  
  26. public function startConsumer(array $queues): void
  27. {
  28. $consumer = new QueueConsumer(
  29. $this->context,
  30. new ChainExtension([new SignalExtension()])
  31. );
  32.  
  33. foreach ($queues as $queue) {
  34. $consumer->bindCallback($queue, function (Message $message) use ($queue) {
  35. event(new NewMessageReceived($queue, $message->getBody()));
  36.  
  37. return Processor::ACK;
  38. });
  39. }
  40.  
  41. $consumer->consume();
  42. }
  43.  
  44. public function publishMessage(string $queue, string $message, int $delay = 0): void
  45. {
  46. $destination = $this->declareQueue($queue);
  47.  
  48. $amqpMessage = $this->context->createMessage($message);
  49.  
  50. $producer = $this->context->createProducer();
  51.  
  52. if ($delay > 0) {
  53. $producer->setDeliveryDelay($delay);
  54. }
  55.  
  56. $producer->send($destination, $amqpMessage);
  57. }
  58.  
  59. public function receiveMessage(string $queue): void
  60. {
  61. $destination = $this->declareQueue($queue);
  62.  
  63. $consumer = $this->context->createConsumer($destination);
  64.  
  65. if ($message = $consumer->receive(1000)) {
  66. event(new NewMessageReceived($queue, $message->getBody()));
  67.  
  68. $consumer->acknowledge($message);
  69. }
  70. }
  71.  
  72. protected function declareQueue($queue = null): Queue
  73. {
  74. $destination = $this->context->createQueue($queue);
  75.  
  76. $destination->addFlag(AmqpQueue::FLAG_DURABLE);
  77.  
  78. $this->context->declareQueue($destination);
  79.  
  80. return $destination;
  81. }
  82. }
RAW Paste Data