Guest User

Untitled

a guest
Dec 11th, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.88 KB | None | 0 0
  1. <?php
  2.  
  3. public function actionSend()
  4. {
  5. $connection = new AMQPStreamConnection(
  6. '***',
  7. ***,
  8. '**',
  9. '**',
  10. '**'
  11. );
  12. $channel = $connection->channel();
  13.  
  14. $channel->queue_declare(
  15. 'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
  16. false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
  17. true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
  18. false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
  19. false #autodelete - очередь удаляется, когда отписывается последний подписчик
  20. );
  21.  
  22. $msg = new AMQPMessage(
  23. '32423',
  24. array('delivery_mode' => 2) #создаёт сообщение постоянным, чтобы оно не потерялось при падении или закрытии сервера
  25. );
  26.  
  27. $channel->basic_publish(
  28. $msg, #сообщение
  29. '', #обмен
  30. 'invoice_queue' #ключ маршрутизации (очередь)
  31. );
  32.  
  33. $channel->close();
  34. $connection->close();
  35. }
  36.  
  37. public function actionListen()
  38. {
  39. $connection = new AMQPStreamConnection(
  40. '***',
  41. ***,
  42. '**',
  43. '**',
  44. '**'
  45. );
  46. $channel = $connection->channel();
  47.  
  48. $channel->queue_declare(
  49. 'invoice_queue', #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
  50. false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
  51. true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
  52. false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
  53. false #autodelete - очередь удаляется, когда отписывается последний подписчик
  54.  
  55. );
  56.  
  57. /**
  58. * не отправляем новое сообщение на обработчик, пока он
  59. * не обработал и не подтвердил предыдущее. Вместо этого
  60. * направляем сообщение на любой свободный обработчик
  61. */
  62. $channel->basic_qos(
  63. null, #размер предварительной выборки - размер окна предварительнйо выборки в октетах, null означает “без определённого ограничения”
  64. 1, #количество предварительных выборок - окна предварительных выборок в рамках целого сообщения
  65. null #глобальный - global=null означает, что настройки QoS должны применяться для получателей, global=true означает, что настройки QoS должны применяться к каналу
  66. );
  67.  
  68. /**
  69. * оповещает о своей заинтересованности в получении
  70. * сообщений из определённой очереди. В таком случае мы
  71. * говорим, что они регистрируют получателя, или устанавливают
  72. * подписку на очередь. Каждый получатель (подписка) имеет
  73. * идентификатор, называемый “тег получателя”.
  74. */
  75. $channel->basic_consume(
  76. 'invoice_queue', #очередь
  77. '', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка
  78. false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал
  79. false, #без подтверждения - false: подтверждения включены, true - подтверждения отключены. отправлять соответствующее подтверждение обработчику, как только задача будет выполнена
  80. false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения
  81. false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа
  82. array($this, 'process') #функция обратного вызова - метод, который будет принимать сообщение
  83. );
  84.  
  85. while(count($channel->callbacks)) {
  86. $this->stdout('Слежу за входящими сообщениями' . PHP_EOL);
  87. $channel->wait();
  88. }
  89.  
  90. $channel->close();
  91. $connection->close();
  92. }
  93.  
  94. public function process(AMQPMessage $msg)
  95. {
  96. $this->generatePdf()->sendEmail();
  97. $this->stdout($msg->delivery_info['delivery_tag'] . PHP_EOL);
  98. /**
  99. * Если получатель умирает, не отправив подтверждения, брокер
  100. * AMQP пошлёт сообщение другому получателю. Если свободных
  101. * на данный момент нет - брокер подождёт до тех пор, пока
  102. * освободится хотя-бы один зарегистрированный получатель
  103. * на эту очередь, прежде чем попытаться заново доставить
  104. * сообщение
  105. */
  106. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  107. }
  108.  
  109. /**
  110. * Генерирует PDF файл с накладной
  111. *
  112. * @return \console\controllers\YadController
  113. * @throws \Exception
  114. */
  115. private function generatePdf()
  116. {
  117. $this->stdout('generatePdf' . PHP_EOL);
  118. /**
  119. * Симулируем время обработки PDF. Это занимает от 2 до 5 секунд
  120. */
  121. sleep(random_int(2, 5));
  122. return $this;
  123. }
  124.  
  125. /**
  126. * Отправляет письмо
  127. *
  128. * @return \console\controllers\YadController
  129. * @throws \Exception
  130. */
  131. private function sendEmail()
  132. {
  133. $this->stdout('sendEmail' . PHP_EOL);
  134. /**
  135. * Симулируем время отправки письма. Занимает 1-3 секунды
  136. */
  137. sleep(random_int(1,3));
  138. return $this;
  139. }
Add Comment
Please, Sign In to add comment