Advertisement
aleksv11

producer

May 31st, 2019
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 3.28 KB | None | 0 0
  1. <?php
  2.  
  3. require_once(__DIR__.'/../vendor/autoload.php');
  4. require_once(__DIR__.'/../db_update_class.php'); // Подключаем класс для работы с базой данных
  5.  
  6. #define('AMQP_DEBUG', true);
  7. define("RABBITMQ_HOST", "ip");
  8. define("RABBITMQ_PORT", 5672);
  9. define("RABBITMQ_USERNAME", "логин");
  10. define("RABBITMQ_PASSWORD", "пароль");
  11. define("RABBITMQ_QUEUE_NAME", "bets_need_update");
  12.  
  13. // флаг остановки
  14. $shallStopWorking = false;
  15.  
  16. // сигнал об остановке от supervisord
  17. pcntl_signal(SIGTERM, function () use (&$shallStopWorking) {
  18.     echo "Received SIGTERM\n";
  19.     $shallStopWorking = true;
  20. });
  21.  
  22. // обработчик для ctrl+c
  23. pcntl_signal(SIGINT,  function () use (&$shallStopWorking) {
  24.     echo "Received SIGINT\n";
  25.     $shallStopWorking = true;
  26. });
  27.  
  28. echo "Старт работы демона\n";
  29.  
  30. while (!$shallStopWorking) {
  31.  
  32.     for ($i = 0; $i < 10; $i += 1) sleep(1); // пауза 10 секунд
  33.  
  34.     echo "Publisher - пауза 10 секунд\n";
  35.    
  36.     $db_update = new BetsUpdate();
  37.     $bet_update_rows = $db_update->check_bets_for_update();// массив ставок из бд для обновления результатов
  38.  
  39.     $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
  40.         RABBITMQ_HOST,
  41.         RABBITMQ_PORT,
  42.         RABBITMQ_USERNAME,
  43.         RABBITMQ_PASSWORD
  44.     );
  45.  
  46.     $channel = $connection->channel();
  47.  
  48.     # Создаем очередь, если не существует
  49.    $channel->queue_declare(
  50.         $queue = RABBITMQ_QUEUE_NAME, #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
  51.        $passive = false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
  52.        $durable = true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
  53.        $exclusive = false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
  54.        $auto_delete = false, #autodelete - очередь удаляется, когда отписывается последний подписчик
  55.        $nowait = false,
  56.         $arguments = null,
  57.         $ticket = null
  58.     );
  59.  
  60.     if ($bet_update_rows) {
  61.         foreach ($bet_update_rows as $row) {
  62.             $message = new \PhpAmqpLib\Message\AMQPMessage( json_encode($row, JSON_UNESCAPED_SLASHES), array('delivery_mode' => 2) );
  63.             $channel->basic_publish($message, '', RABBITMQ_QUEUE_NAME); // Добавляет JSON для задачи "bets_need_update"
  64.             echo "Добавил задание в очередь: ".json_encode($row)."\n";
  65.             print 'Job created' . PHP_EOL;
  66.         }
  67.     }
  68.  
  69.     $channel->close();
  70.     $connection->close();
  71.     // обработаем сигналы в конце итерации
  72.     pcntl_signal_dispatch();
  73. }
  74. echo "Завершение работы демона\n";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement