Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def _heartbeat_loop(rabbit_connection):
- """
- Фоновый цикл: периодически вызываем process_data_events(), чтобы
- heartbeat не отваливался при долгих блокирующих операциях в основном потоке.
- """
- while not _GLOBAL_HEARTBEAT_STOP:
- try:
- rabbit_connection.process_data_events()
- except Exception as e:
- logger.error(f"...")
- time.sleep(1)
- def get_rabbit_connection() -> RabbitConnectionWrapper:
- global _GLOBAL_RABBIT_CONNECTION, _GLOBAL_HEARTBEAT_THREAD
- # Если уже есть рабочее соединение — просто возвращаем его
- if _GLOBAL_RABBIT_CONNECTION and not _GLOBAL_RABBIT_CONNECTION.is_closed:
- return _GLOBAL_RABBIT_CONNECTION
- # Иначе создаём новый объект
- _GLOBAL_RABBIT_CONNECTION = RabbitConnectionWrapper()
- # Запускаем фоновый поток, если его ещё нет
- if not _GLOBAL_HEARTBEAT_THREAD:
- _GLOBAL_HEARTBEAT_THREAD = threading.Thread(
- target=_heartbeat_loop,
- args=(_GLOBAL_RABBIT_CONNECTION,),
- daemon=True
- )
- _GLOBAL_HEARTBEAT_THREAD.start()
- return _GLOBAL_RABBIT_CONNECTION
- # Какой вызов был
- def main_cycle():
- queue = RABBIT_QUEUE_EXCHANGE_MESSAGES
- logger.info(f"Starting listen `{queue}`")
- with RabbitConnectionWrapper() as rabbit:
- rabbit.consume(queue, process_message)
- # Какой вызов стал
- def main_cycle():
- queue = RABBIT_QUEUE_EXCHANGE_MESSAGES
- logger.info(f"Starting listen `{queue}`")
- rabbit = get_rabbit_connection() # Берём общее соединение
- rabbit.consume(queue, process_message)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement