Advertisement
Guest User

Untitled

a guest
May 24th, 2017
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.44 KB | None | 0 0
  1. <?php
  2. namespace Acme;
  3.  
  4. use Enqueue\AmqpExt\AmqpContext;
  5. use Enqueue\AmqpExt\AmqpMessage;
  6. use Enqueue\Consumption\Context;
  7. use Enqueue\Consumption\EmptyExtensionTrait;
  8. use Enqueue\Consumption\ExtensionInterface;
  9.  
  10. class SetupBrokerExtension implements ExtensionInterface
  11. {
  12. use EmptyExtensionTrait;
  13.  
  14. public function onStart(Context $context)
  15. {
  16. if (false == $context->getPsrContext() instanceof AmqpContext) {
  17. return;
  18. }
  19.  
  20. /** @var AmqpContext $psrContext */
  21. $psrContext = $context->getPsrContext();
  22.  
  23. $queueName = 'your_queue_name';
  24. $queue = $psrContext->createQueue($queueName);
  25.  
  26. // in order to use delay feature make sure the rabbitmq_delayed_message_exchange plugin is installed.
  27. $delayTopic = $psrContext->createTopic($queueName.'.delayed');
  28. $delayTopic->setRoutingKey($queueName);
  29. $delayTopic->setType('x-delayed-message');
  30. $delayTopic->addFlag(AMQP_DURABLE);
  31. $delayTopic->setArguments([
  32. 'x-delayed-type' => 'direct',
  33. ]);
  34.  
  35. $psrContext->declareTopic($delayTopic);
  36.  
  37. $psrContext->bind($delayTopic, $queue);
  38. }
  39. }
  40.  
  41. // to use it
  42.  
  43. /** @var AmqpContext $amqpContext */
  44.  
  45. $message = new AmqpMessage();
  46. $message->setProperty('x-delay', (string) 5000 /* 5 sec */);
  47. $delayTopic = $amqpContext->createTopic('your_queue_name.delayed');
  48.  
  49. $this->context->createProducer()->send($delayTopic, $message);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement