Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace App\Consumer;
- use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
- use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
- use Webslon\Bundle\ApiBundle\AMQP\Packet;
- use Webslon\Bundle\ApiBundle\AMQP\Producer;
- use Webslon\Bundle\ApiBundle\Annotation\Enqueue\ExchangeParameters;
- use Webslon\Bundle\ApiBundle\Consumer\Consumer;
- use Webslon\Bundle\ApiBundle\Annotation\Enqueue\Consume;
- class DemoConsumer extends Consumer
- {
- /** @var TokenStorageInterface */
- private $tokenStorage;
- /** @var AuthorizationCheckerInterface */
- private $authorizationChecker;
- /** @var Producer */
- private $producer;
- /**
- * DemoConsumer constructor.
- * @param TokenStorageInterface $tokenStorage
- * @param AuthorizationCheckerInterface $authorizationChecker
- * @param Producer $producer
- */
- public function __construct(TokenStorageInterface $tokenStorage, AuthorizationCheckerInterface $authorizationChecker, Producer $producer)
- {
- $this->tokenStorage = $tokenStorage;
- $this->authorizationChecker = $authorizationChecker;
- $this->producer = $producer;
- }
- /**
- * @Consume(queue="demo_queue_a")
- */
- public function a(Packet $packet)
- {
- dump('Делаю запрос из '.__FUNCTION__.' '.$packet->getId());
- $this->replyContext()->set('demo_queue.a.message', 'Значение из консьюмера a');
- $this->rpcForwardAsync('', [], 'demo_queue_b', 'demo_queue_a_responses');
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_a_responses")
- */
- public function aResponses(Packet $packet)
- {
- dump('Ответ дошел до '.__FUNCTION__);
- dump('Конец обработки ответов');
- dump('Reply context "demo_queue.a.message":'.$this->replyContext()->get('demo_queue.a.message'));
- dump('Reply context "demo_queue.b.message":'.$this->replyContext()->get('demo_queue.b.message'));
- dump('Reply context "demo_queue.c.message":'.$this->replyContext()->get('demo_queue.c.message'));
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_b")
- */
- public function b(Packet $packet)
- {
- dump('Делаю запрос из '.__FUNCTION__.' '.$packet->getId());
- $this->replyContext()->set('demo_queue.b.message', 'Значение из консьюмера b');
- $this->rpcForwardAsync('', [], 'demo_queue_c', 'demo_queue_b_responses');
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_b_responses")
- */
- public function bResponses(Packet $packet)
- {
- dump('Передаю ответ дальше из '.__FUNCTION__);
- $this->reply([]);
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_c")
- */
- public function c(Packet $packet)
- {
- dump('Делаю запрос из '.__FUNCTION__.' '.$packet->getId());
- $this->replyContext()->set('demo_queue.c.message', 'Значение из консьюмера c');
- $this->rpcForwardAsync('', [], 'demo_queue_d', 'demo_queue_c_responses');
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_c_responses")
- */
- public function cResponses(Packet $packet)
- {
- dump('Передаю ответ дальше из '.__FUNCTION__);
- $this->reply([]);
- $this->ack();
- }
- /**
- * @Consume(queue="demo_queue_d")
- */
- public function d(Packet $packet)
- {
- dump('Цепочка запросов завершена, передаю ответ по цепочке');
- $this->replyContext()->set('demo_queue.d.message', 'Значение из консьюмера d');
- $this->reply([]);
- $this->ack();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement