Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace App\Library\AMQP;
- use Interop\Queue\Queue;
- use Interop\Queue\Message;
- use Interop\Queue\Processor;
- use Interop\Amqp\Impl\AmqpQueue;
- use Enqueue\Consumption\QueueConsumer;
- use Enqueue\Consumption\ChainExtension;
- use App\Events\AMQP\NewMessageReceived;
- use Enqueue\AmqpBunny\AmqpConnectionFactory;
- use Enqueue\Consumption\Extension\SignalExtension;
- class Amqp
- {
- /** @var \Interop\Queue\Context */
- protected $context;
- public function __construct(array $config)
- {
- $connectionFactory = new AmqpConnectionFactory($config);
- $this->context = $connectionFactory->createContext();
- }
- public function startConsumer(array $queues): void
- {
- $consumer = new QueueConsumer(
- $this->context,
- new ChainExtension([new SignalExtension()])
- );
- foreach ($queues as $queue) {
- $consumer->bindCallback($queue, function (Message $message) use ($queue) {
- event(new NewMessageReceived($queue, $message->getBody()));
- return Processor::ACK;
- });
- }
- $consumer->consume();
- }
- public function publishMessage(string $queue, string $message, int $delay = 0): void
- {
- $destination = $this->declareQueue($queue);
- $amqpMessage = $this->context->createMessage($message);
- $producer = $this->context->createProducer();
- if ($delay > 0) {
- $producer->setDeliveryDelay($delay);
- }
- $producer->send($destination, $amqpMessage);
- }
- public function receiveMessage(string $queue): void
- {
- $destination = $this->declareQueue($queue);
- $consumer = $this->context->createConsumer($destination);
- if ($message = $consumer->receive(1000)) {
- event(new NewMessageReceived($queue, $message->getBody()));
- $consumer->acknowledge($message);
- }
- }
- protected function declareQueue($queue = null): Queue
- {
- $destination = $this->context->createQueue($queue);
- $destination->addFlag(AmqpQueue::FLAG_DURABLE);
- $this->context->declareQueue($destination);
- return $destination;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement