Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- public function actionSend()
- {
- $connection = new AMQPStreamConnection(
- '***',
- ***,
- '**',
- '**',
- '**'
- );
- $channel = $connection->channel();
- $channel->queue_declare(
- 'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
- false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
- true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
- false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
- false #autodelete - очередь удаляется, когда отписывается последний подписчик
- );
- $msg = new AMQPMessage(
- '32423',
- array('delivery_mode' => 2) #создаёт сообщение постоянным, чтобы оно не потерялось при падении или закрытии сервера
- );
- $channel->basic_publish(
- $msg, #сообщение
- '', #обмен
- 'invoice_queue' #ключ маршрутизации (очередь)
- );
- $channel->close();
- $connection->close();
- }
- public function actionListen()
- {
- $connection = new AMQPStreamConnection(
- '***',
- ***,
- '**',
- '**',
- '**'
- );
- $channel = $connection->channel();
- $channel->queue_declare(
- 'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
- false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
- true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
- false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
- false #autodelete - очередь удаляется, когда отписывается последний подписчик
- );
- /**
- * не отправляем новое сообщение на обработчик, пока он
- * не обработал и не подтвердил предыдущее. Вместо этого
- * направляем сообщение на любой свободный обработчик
- */
- $channel->basic_qos(
- null, #размер предварительной выборки - размер окна предварительнйо выборки в октетах, null означает “без определённого ограничения”
- 1, #количество предварительных выборок - окна предварительных выборок в рамках целого сообщения
- null #глобальный - global=null означает, что настройки QoS должны применяться для получателей, global=true означает, что настройки QoS должны применяться к каналу
- );
- /**
- * оповещает о своей заинтересованности в получении
- * сообщений из определённой очереди. В таком случае мы
- * говорим, что они регистрируют получателя, или устанавливают
- * подписку на очередь. Каждый получатель (подписка) имеет
- * идентификатор, называемый “тег получателя”.
- */
- $channel->basic_consume(
- 'invoice_queue', #очередь
- '', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка
- false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал
- false, #без подтверждения - false: подтверждения включены, true - подтверждения отключены. отправлять соответствующее подтверждение обработчику, как только задача будет выполнена
- false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения
- false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа
- array($this, 'process') #функция обратного вызова - метод, который будет принимать сообщение
- );
- while(count($channel->callbacks)) {
- $this->stdout('Слежу за входящими сообщениями' . PHP_EOL);
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- }
- public function process(AMQPMessage $msg)
- {
- $this->generatePdf()->sendEmail();
- $this->stdout($msg->delivery_info['delivery_tag'] . PHP_EOL);
- /**
- * Если получатель умирает, не отправив подтверждения, брокер
- * AMQP пошлёт сообщение другому получателю. Если свободных
- * на данный момент нет - брокер подождёт до тех пор, пока
- * освободится хотя-бы один зарегистрированный получатель
- * на эту очередь, прежде чем попытаться заново доставить
- * сообщение
- */
- $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
- }
- /**
- * Генерирует PDF файл с накладной
- *
- * @return \console\controllers\YadController
- * @throws \Exception
- */
- private function generatePdf()
- {
- $this->stdout('generatePdf' . PHP_EOL);
- /**
- * Симулируем время обработки PDF. Это занимает от 2 до 5 секунд
- */
- sleep(random_int(2, 5));
- return $this;
- }
- /**
- * Отправляет письмо
- *
- * @return \console\controllers\YadController
- * @throws \Exception
- */
- private function sendEmail()
- {
- $this->stdout('sendEmail' . PHP_EOL);
- /**
- * Симулируем время отправки письма. Занимает 1-3 секунды
- */
- sleep(random_int(1,3));
- return $this;
- }
Add Comment
Please, Sign In to add comment