Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- require_once(__DIR__.'/../vendor/autoload.php');
- require_once(__DIR__.'/../db_update_class.php'); // Подключаем класс для работы с базой данных
- define("RABBITMQ_HOST", "ip");
- define("RABBITMQ_PORT", 5672);
- define("RABBITMQ_USERNAME", "логин");
- define("RABBITMQ_PASSWORD", "пароль");
- define("RABBITMQ_QUEUE_NAME", "bets_need_update");
- // флаг остановки
- $shallStopWorking = false;
- // сигнал об остановке от supervisord
- pcntl_signal(SIGTERM, function () use (&$shallStopWorking) {
- echo "Received SIGTERM\n";
- $shallStopWorking = true;
- });
- // обработчик для ctrl+c
- pcntl_signal(SIGINT, function () use (&$shallStopWorking) {
- echo "Received SIGINT\n";
- $shallStopWorking = true;
- });
- // echo "Started\n";
- while (!$shallStopWorking) {
- // обрабатываем задания из очереди, считаем статистику чего-либо,
- // или делаем ещё что-то очень важное
- # Чтение очереди
- while (true) {
- # Проверка соединения
- $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
- RABBITMQ_HOST,
- RABBITMQ_PORT,
- RABBITMQ_USERNAME,
- RABBITMQ_PASSWORD
- );
- $channel = $connection->channel();
- # Create the queue if it does not already exist.
- $channel->queue_declare(
- $queue = RABBITMQ_QUEUE_NAME, #queue name - Имя очереди может содержать до 255 байт UTF-8 символов
- $passive = false, #passive - может использоваться для проверки того, инициирован ли обмен, без того, чтобы изменять состояние сервера
- $durable = true, #durable - убедимся, что RabbitMQ никогда не потеряет очередь при падении - очередь переживёт перезагрузку брокера
- $exclusive = false, #exclusive - используется только одним соединением, и очередь будет удалена при закрытии соединения
- $auto_delete = false, #autodelete - очередь удаляется, когда отписывается последний подписчик
- $nowait = false,
- $arguments = null,
- $ticket = null
- );
- echo "Жду сообщения\n";
- $callback = function ($message) {
- $data = json_decode($message->body, true);
- if ($data) {
- $db_update = new BetsUpdate();
- $db_update->worker_bet_info($data["binaryteam_betid"], $data["user_api_key"]);
- }
- echo "Получил задание # ".json_encode($data, true)."_Время: ".date("Y-m-d\ H:i:s", time())."\n";
- // отправляем подтверждение получения сообщения
- /** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
- // $channel = $message->delivery_info['channel'];
- // $channel->basic_ack($message->delivery_info['delivery_tag']);
- };
- try {
- $channel->basic_qos(null, 1, null);
- $channel->basic_consume(RABBITMQ_QUEUE_NAME, '', false, true, false, false, $callback);
- // тут происходит магия бесконечной обработки опубликованных сообщений
- // и бесконечного ожидания публикации новых сообщений
- while (count($channel->callbacks)) {
- $channel->wait();
- }
- $channel->close();
- $connection->close();
- } catch (Exception $e) {
- //echo "Сработало исключение - {$e->getMessage()} \n";
- // echo "Заданий нет \n";
- }
- }
- pcntl_signal_dispatch(); // обработаем сигналы в конце итерации
- }
- echo "Finished\n";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement