Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace Acme;
- use Enqueue\AmqpExt\AmqpContext;
- use Enqueue\AmqpExt\AmqpMessage;
- use Enqueue\Consumption\Context;
- use Enqueue\Consumption\EmptyExtensionTrait;
- use Enqueue\Consumption\ExtensionInterface;
- class SetupBrokerExtension implements ExtensionInterface
- {
- use EmptyExtensionTrait;
- public function onStart(Context $context)
- {
- if (false == $context->getPsrContext() instanceof AmqpContext) {
- return;
- }
- /** @var AmqpContext $psrContext */
- $psrContext = $context->getPsrContext();
- $queueName = 'your_queue_name';
- $queue = $psrContext->createQueue($queueName);
- // in order to use delay feature make sure the rabbitmq_delayed_message_exchange plugin is installed.
- $delayTopic = $psrContext->createTopic($queueName.'.delayed');
- $delayTopic->setRoutingKey($queueName);
- $delayTopic->setType('x-delayed-message');
- $delayTopic->addFlag(AMQP_DURABLE);
- $delayTopic->setArguments([
- 'x-delayed-type' => 'direct',
- ]);
- $psrContext->declareTopic($delayTopic);
- $psrContext->bind($delayTopic, $queue);
- }
- }
- // to use it
- /** @var AmqpContext $amqpContext */
- $message = new AmqpMessage();
- $message->setProperty('x-delay', (string) 5000 /* 5 sec */);
- $delayTopic = $amqpContext->createTopic('your_queue_name.delayed');
- $this->context->createProducer()->send($delayTopic, $message);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement