Advertisement
LilChicha174

Untitled

Feb 3rd, 2025
25
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.84 KB | None | 0 0
  1. def _heartbeat_loop(rabbit_connection):
  2. """
  3. Фоновый цикл: периодически вызываем process_data_events(), чтобы
  4. heartbeat не отваливался при долгих блокирующих операциях в основном потоке.
  5. """
  6. while not _GLOBAL_HEARTBEAT_STOP:
  7. try:
  8. rabbit_connection.process_data_events()
  9. except Exception as e:
  10. logger.error(f"...")
  11. time.sleep(1)
  12.  
  13. def get_rabbit_connection() -> RabbitConnectionWrapper:
  14. global _GLOBAL_RABBIT_CONNECTION, _GLOBAL_HEARTBEAT_THREAD
  15.  
  16. # Если уже есть рабочее соединение — просто возвращаем его
  17. if _GLOBAL_RABBIT_CONNECTION and not _GLOBAL_RABBIT_CONNECTION.is_closed:
  18. return _GLOBAL_RABBIT_CONNECTION
  19.  
  20. # Иначе создаём новый объект
  21. _GLOBAL_RABBIT_CONNECTION = RabbitConnectionWrapper()
  22.  
  23. # Запускаем фоновый поток, если его ещё нет
  24. if not _GLOBAL_HEARTBEAT_THREAD:
  25. _GLOBAL_HEARTBEAT_THREAD = threading.Thread(
  26. target=_heartbeat_loop,
  27. args=(_GLOBAL_RABBIT_CONNECTION,),
  28. daemon=True
  29. )
  30. _GLOBAL_HEARTBEAT_THREAD.start()
  31.  
  32. return _GLOBAL_RABBIT_CONNECTION
  33.  
  34. # Какой вызов был
  35.  
  36. def main_cycle():
  37. queue = RABBIT_QUEUE_EXCHANGE_MESSAGES
  38. logger.info(f"Starting listen `{queue}`")
  39. with RabbitConnectionWrapper() as rabbit:
  40. rabbit.consume(queue, process_message)
  41.  
  42.  
  43.  
  44. # Какой вызов стал
  45.  
  46. def main_cycle():
  47. queue = RABBIT_QUEUE_EXCHANGE_MESSAGES
  48. logger.info(f"Starting listen `{queue}`")
  49.  
  50. rabbit = get_rabbit_connection() # Берём общее соединение
  51. rabbit.consume(queue, process_message)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement