Advertisement
Guest User

Untitled

a guest
Jun 23rd, 2017
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.42 KB | None | 0 0
  1. <?php
  2.  
  3. namespace Lib;
  4.  
  5. use PhpAmqpLib\Connection\AMQPConnection;
  6. use PhpAmqpLib\Exception\AMQPTimeoutException;
  7. use PhpAmqpLib\Message\AMQPMessage;
  8.  
  9. /**
  10. * Class Queue
  11. * @package Lib
  12. */
  13. final class Queue
  14. {
  15. /**
  16. * @var AMQPConnection
  17. */
  18. private $connect;
  19.  
  20. public function __construct($connect)
  21. {
  22. $this->connect = $connect;
  23. $this->channel = $this->connect->channel();
  24. }
  25.  
  26. private function loadChannel($channelName)
  27. {
  28. $this->channel->queue_declare(
  29. $channelName, #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
  30. false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
  31. true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
  32. false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
  33. false #autodelete - очередь удаляется, когда отписывается последний подписчик
  34. );
  35. }
  36.  
  37. /**
  38. * Добавить в очередь
  39. *
  40. * @param string $channel
  41. * @param $data
  42. */
  43. public function put(string $channel, $data)
  44. {
  45. $this->loadChannel($channel);
  46. if (is_array($data)) {
  47. $data = 'a' . json_encode($data);
  48. } elseif (is_object($data)) {
  49. $data = 'o' . serialize($data);
  50. } elseif (is_string($data)) {
  51. $data = 's' . $data;
  52. } else {
  53. $data = 'n' . $data;
  54. }
  55. $msg = new AMQPMessage($data, ["delivery_mode" => 2]);
  56. $this->channel->basic_publish(
  57. $msg, #message
  58. '', #exchange
  59. $channel #routing key
  60. );
  61. }
  62.  
  63. /**
  64. * Доставь одно значение из очереди
  65. *
  66. * @param string $channel
  67. * @return null
  68. */
  69. public function pullOne(string $channel)
  70. {
  71. $this->loadChannel($channel);
  72. $while = true;
  73. $result = null;
  74. $count = 0;
  75. $this->channel->basic_qos(null, 1, null);
  76. $this->channel->basic_consume(
  77. $channel, #очередь
  78. '', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка
  79. false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал
  80. false, #без подтверждения - отправлять соответствующее подтверждение обработчику, как только задача будет выполнена
  81. false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения
  82. false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа
  83. function ($msg) use (&$while, &$result, &$count) {
  84. if ($count > 0) {
  85. return false;
  86. }
  87. $count++;
  88. $while = false;
  89. $data = $msg->body;
  90. switch (substr($data, 0, 1)) {
  91. case 'a':
  92. $result = json_decode(substr($data, 1), true);
  93. break;
  94. case 'o':
  95. $result = unserialize(substr($data, 1));
  96. break;
  97. case 's':
  98. $result = (string)(substr($data, 1));
  99. break;
  100. case 'n':
  101. $result = (float)(substr($data, 1));
  102. break;
  103. default:
  104. $result = null;
  105. break;
  106. }
  107. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  108. return true;
  109. }
  110. );
  111. $start = microtime(true);
  112. try {
  113. while (count($this->channel->callbacks) && $while && time() - $start < 10) {
  114. $this->channel->wait(null, true, 2);
  115. }
  116. } catch (AMQPTimeoutException $e) {
  117. }
  118. return $result;
  119. }
  120.  
  121. /**
  122. * Обработка очереди callback-ом
  123. *
  124. * @param string $channel
  125. * @param mixed $callback
  126. * @return null
  127. */
  128. public function pull(string $channel, $callback)
  129. {
  130. $this->loadChannel($channel);
  131. $this->channel->basic_consume(
  132. $channel, #очередь
  133. '', #тег получателя - Идентификатор получателя, валидный в пределах текущего канала. Просто строка
  134. false, #не локальный - TRUE: сервер не будет отправлять сообщения соединениям, которые сам опубликовал
  135. false, #без подтверждения - отправлять соответствующее подтверждение обработчику, как только задача будет выполнена
  136. false, #эксклюзивная - к очереди можно получить доступ только в рамках текущего соединения
  137. false, #не ждать - TRUE: сервер не будет отвечать методу. Клиент не должен ждать ответа
  138. function ($msg) use ($callback) {
  139. $data = $msg->body;
  140. switch (substr($data, 0, 1)) {
  141. case 'a':
  142. $result = json_decode(substr($data, 1), true);
  143. break;
  144. case 'o':
  145. $result = unserialize(substr($data, 1));
  146. break;
  147. case 's':
  148. $result = (string)(substr($data, 1));
  149. break;
  150. case 'n':
  151. $result = (float)(substr($data, 1));
  152. break;
  153. default:
  154. $result = null;
  155. break;
  156. }
  157. if ($callback($result)) {
  158. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  159. }
  160. return true;
  161. }
  162. );
  163. try {
  164. while (count($this->channel->callbacks)) {
  165. $this->channel->wait(null, true, 0);
  166. }
  167. } catch (AMQPTimeoutException $e) {
  168. }
  169. }
  170. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement