Advertisement
aleksv11

consumer

May 31st, 2019
648
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 4.06 KB | None | 0 0
  1. <?php
  2.  
  3. require_once(__DIR__.'/../vendor/autoload.php');
  4. require_once(__DIR__.'/../db_update_class.php'); // Подключаем класс для работы с базой данных
  5.  
  6.  
  7. define("RABBITMQ_HOST", "ip");
  8. define("RABBITMQ_PORT", 5672);
  9. define("RABBITMQ_USERNAME", "логин");
  10. define("RABBITMQ_PASSWORD", "пароль");
  11. define("RABBITMQ_QUEUE_NAME", "bets_need_update");
  12.  
  13.  
  14.  
  15. // флаг остановки
  16. $shallStopWorking = false;
  17.  
  18. // сигнал об остановке от supervisord
  19. pcntl_signal(SIGTERM, function () use (&$shallStopWorking) {
  20.     echo "Received SIGTERM\n";
  21.     $shallStopWorking = true;
  22. });
  23.  
  24. // обработчик для ctrl+c
  25. pcntl_signal(SIGINT,  function () use (&$shallStopWorking) {
  26.     echo "Received SIGINT\n";
  27.     $shallStopWorking = true;
  28. });
  29.  
  30. // echo "Started\n";
  31.  
  32. while (!$shallStopWorking) {
  33.  
  34.     // обрабатываем задания из очереди, считаем статистику чего-либо,
  35.     // или делаем ещё что-то очень важное
  36.  
  37.  
  38.  
  39. # Чтение очереди
  40. while (true) {
  41.     # Проверка соединения
  42.  
  43.     $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  44.         RABBITMQ_HOST,
  45.         RABBITMQ_PORT,
  46.         RABBITMQ_USERNAME,
  47.         RABBITMQ_PASSWORD
  48.     );
  49.  
  50.     $channel = $connection->channel();
  51.  
  52.     # Create the queue if it does not already exist.
  53.    $channel->queue_declare(
  54.         $queue = RABBITMQ_QUEUE_NAME, #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
  55.        $passive = false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
  56.        $durable = true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
  57.        $exclusive = false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
  58.        $auto_delete = false, #autodelete - очередь удаляется, когда отписывается последний подписчик
  59.        $nowait = false,
  60.         $arguments = null,
  61.         $ticket = null
  62.     );
  63.      
  64.     echo "Жду сообщения\n";
  65.  
  66.     $callback = function ($message) {
  67.        
  68.         $data = json_decode($message->body, true);
  69.  
  70.         if ($data) {
  71.             $db_update = new BetsUpdate();    
  72.             $db_update->worker_bet_info($data["binaryteam_betid"], $data["user_api_key"]);
  73.         }
  74.  
  75.         echo "Получил задание # ".json_encode($data, true)."_Время: ".date("Y-m-d\ H:i:s", time())."\n";
  76.  
  77.         // отправляем подтверждение получения сообщения
  78.         /** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
  79.       //  $channel = $message->delivery_info['channel'];
  80.       //  $channel->basic_ack($message->delivery_info['delivery_tag']);
  81.     };
  82.  
  83.     try {
  84.  
  85.         $channel->basic_qos(null, 1, null);
  86.         $channel->basic_consume(RABBITMQ_QUEUE_NAME, '', false, true, false, false, $callback);
  87.  
  88.         // тут происходит магия бесконечной обработки опубликованных сообщений
  89.         // и бесконечного ожидания публикации новых сообщений
  90.         while (count($channel->callbacks)) {
  91.             $channel->wait();
  92.         }
  93.  
  94.         $channel->close();
  95.         $connection->close();
  96.  
  97.     } catch (Exception $e) {
  98.         //echo "Сработало исключение - {$e->getMessage()} \n";
  99.       //  echo "Заданий нет \n";
  100.        
  101.     }
  102. }
  103.  
  104.     pcntl_signal_dispatch(); // обработаем сигналы в конце итерации
  105. }
  106.  
  107. echo "Finished\n";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement