Advertisement
morington

Untitled

Oct 21st, 2023
753
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.86 KB | None | 0 0
  1. import asyncio
  2. from pathlib import Path
  3. from typing import Any, Coroutine
  4.  
  5. import nats
  6. import ormsgpack
  7. import structlog
  8. from aiogram import Bot
  9. from aiogram.enums import ParseMode
  10. from aiogram.exceptions import TelegramForbiddenError, TelegramRetryAfter
  11. from nats.aio.client import Client
  12. from nats.js import JetStreamContext
  13. from nats.js.errors import NotFoundError
  14. from pydantic import ValidationError
  15. from structlog.contextvars import bind_contextvars
  16.  
  17. from Fluent import FluentDispenser, CustomFluentLocalization
  18. from MessageTransmission import loader_config, exc, models, enums
  19. from MessageTransmission.const import *
  20.  
  21. logger: structlog.BoundLogger = structlog.stdlib.get_logger(__name__)
  22.  
  23.  
  24. class Initialize:
  25.     jetstream: JetStreamContext
  26.  
  27.     def __init__(self, config: dict[str, Any]) -> None:
  28.         """ Initial """
  29.         logger.debug('Initializing a message processing object')
  30.         self.config = config
  31.  
  32.         self.bot = Bot(token=config['application']['key'], parse_mode=ParseMode.HTML)
  33.         self.dispenser = FluentDispenser(
  34.             default_language=config['application']['default_language_code']
  35.         )
  36.  
  37.  
  38.     async def start_polling(self) -> None:
  39.         """ Connect to NATS server and return the """
  40.         nats_connect: Client = await nats.connect(
  41.             servers=[self.config['nats_url']]
  42.         )
  43.         logger.debug('Successful connection to NATS service')
  44.  
  45.         self.jetstream: JetStreamContext = nats_connect.jetstream()
  46.         await self.polling()
  47.  
  48.  
  49.     @property
  50.     async def subscriber(self):
  51.         """ Subscribe to NATS server """
  52.         return await self.jetstream.subscribe(
  53.             stream=JETSTREAM_NAME,
  54.             subject=JETSTREAM_SUBJECT
  55.         )
  56.  
  57.  
  58.     @property
  59.     async def subscriber_builder(self):
  60.         """ Subscribe builder to NATS server. If connection fails, creates a new stream """
  61.         if self.jetstream is None:
  62.             raise ValueError("Jetstream is not initialized")
  63.  
  64.         try:
  65.             return await self.subscriber
  66.         except NotFoundError:
  67.             await self.jetstream.add_stream(name=JETSTREAM_NAME, subjects=[JETSTREAM_SUBJECT])
  68.             return await self.subscriber
  69.  
  70.  
  71.     def type_processing(
  72.             self,
  73.             type_message: enums.TypeMessage,
  74.             data: dict[str, Any]
  75.     ) -> Any:
  76.         """ Processing a message model based on message type """
  77.         match type_message:
  78.             case enums.TypeMessage.connection:
  79.                 return models.MessageConnectModel(**data)
  80.             case enums.TypeMessage.no_payment_system:
  81.                 return models.MessageNoPaymentSystemModel(**data)
  82.             case _:
  83.                 raise exc.UnknownMessageTypeError(type_message)
  84.  
  85.  
  86.     async def polling(self) -> None:
  87.         """ Main message processing loop """
  88.         logger.debug('Starting a message processing loop')
  89.         subscriber = await self.subscriber_builder
  90.  
  91.         async for msg in subscriber.messages:
  92.             message: dict[str, Any] = ormsgpack.unpackb(msg.data)
  93.             try:
  94.                 __model_message = models.BaseMessageModel(**message).model_dump()
  95.  
  96.                 metadata = models.MetadataMessageModel(**__model_message['metadata']).model_dump()
  97.                 __type: str = metadata['type_msg']
  98.  
  99.                 if __type not in enums.TypeMessage.__members__:
  100.                     raise exc.MessageTypeMissingError(__type)
  101.  
  102.                 callback: dict = __model_message['callback']
  103.  
  104.                 model: Any = self.type_processing(
  105.                     type_message=enums.TypeMessage[__type],
  106.                     data=__model_message['data_message']
  107.                 )
  108.                 logger.debug(
  109.                     'Processing message',
  110.                     uuid=metadata['uuid'].hex,
  111.                     type_message=__type,
  112.                     metdata=metadata,
  113.                     model=model.model_dump(),
  114.                     callback=callback
  115.                 )
  116.  
  117.                 l10n: CustomFluentLocalization = self.dispenser.get_localization(language_code=metadata['language_code'])
  118.                 await self.bot.send_message(
  119.                     chat_id=metadata['chat_id'],
  120.                     disable_web_page_preview=True,
  121.                     **model.answer(l10n=l10n, callback=callback)
  122.                 )
  123.                 await msg.ack()
  124.  
  125.             except TimeoutError:
  126.                 logger.warning('Timeout error')
  127.             except TelegramRetryAfter as ex:
  128.                 logger.warning(f'Limit exceeded, continue in: {ex.retry_after}')
  129.                 await asyncio.sleep(float(ex.retry_after))
  130.                 continue
  131.             except TelegramForbiddenError:
  132.                 logger.warning('User blocked Bot')
  133.                 await msg.ack()
  134.                 continue
  135.             except exc.MessageTypeMissingError as err:
  136.                 logger.error(f"Error extract a message type: {err} Available types: {list(enums.TypeMessage.__members__.keys())}")
  137.                 await msg.ack()
  138.                 continue
  139.             except exc.UnknownMessageTypeError as err:
  140.                 logger.error(f'Unknown message type: {err.message_type}', exc=err)
  141.                 await msg.ack()
  142.                 continue
  143.             except ValidationError as err:
  144.                 for error in err.errors():
  145.                     logger.error('Error in model validation', failed_key=error.get('loc'), info=error, message=message)
  146.                 await msg.ack()
  147.                 continue
  148.             except Exception as err:
  149.                 logger.error('Unexpected error', exc=err, message=message)
  150.                 await msg.ack()
  151.                 continue
  152.  
  153.  
  154. if __name__ == '__main__':
  155.     config: dict[str, Any] = loader_config()
  156.     message_processing: Initialize = Initialize(config=config)
  157.  
  158.     asyncio.run(message_processing.start_polling())
  159.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement