Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- require_once(__DIR__.'/../vendor/autoload.php');
- require_once(__DIR__.'/../db_update_class.php'); // Подключаем класс для работы с базой данных
- #define('AMQP_DEBUG', true);
- define("RABBITMQ_HOST", "ip");
- define("RABBITMQ_PORT", 5672);
- define("RABBITMQ_USERNAME", "логин");
- define("RABBITMQ_PASSWORD", "пароль");
- define("RABBITMQ_QUEUE_NAME", "bets_need_update");
- // флаг остановки
- $shallStopWorking = false;
- // сигнал об остановке от supervisord
- pcntl_signal(SIGTERM, function () use (&$shallStopWorking) {
- echo "Received SIGTERM\n";
- $shallStopWorking = true;
- });
- // обработчик для ctrl+c
- pcntl_signal(SIGINT, function () use (&$shallStopWorking) {
- echo "Received SIGINT\n";
- $shallStopWorking = true;
- });
- echo "Старт работы демона\n";
- while (!$shallStopWorking) {
- for ($i = 0; $i < 10; $i += 1) sleep(1); // пауза 10 секунд
- echo "Publisher - пауза 10 секунд\n";
- $db_update = new BetsUpdate();
- $bet_update_rows = $db_update->check_bets_for_update();// массив ставок из бд для обновления результатов
- $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- RABBITMQ_HOST,
- RABBITMQ_PORT,
- RABBITMQ_USERNAME,
- RABBITMQ_PASSWORD
- );
- $channel = $connection->channel();
- # Создаем очередь, если не существует
- $channel->queue_declare(
- $queue = RABBITMQ_QUEUE_NAME, #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
- $passive = false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
- $durable = true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
- $exclusive = false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
- $auto_delete = false, #autodelete - очередь удаляется, когда отписывается последний подписчик
- $nowait = false,
- $arguments = null,
- $ticket = null
- );
- if ($bet_update_rows) {
- foreach ($bet_update_rows as $row) {
- $message = new \PhpAmqpLib\Message\AMQPMessage( json_encode($row, JSON_UNESCAPED_SLASHES), array('delivery_mode' => 2) );
- $channel->basic_publish($message, '', RABBITMQ_QUEUE_NAME); // Добавляет JSON для задачи "bets_need_update"
- echo "Добавил задание в очередь: ".json_encode($row)."\n";
- print 'Job created' . PHP_EOL;
- }
- }
- $channel->close();
- $connection->close();
- // обработаем сигналы в конце итерации
- pcntl_signal_dispatch();
- }
- echo "Завершение работы демона\n";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement