Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from pathlib import Path
- from typing import Any, Coroutine
- import nats
- import ormsgpack
- import structlog
- from aiogram import Bot
- from aiogram.enums import ParseMode
- from aiogram.exceptions import TelegramForbiddenError, TelegramRetryAfter
- from nats.aio.client import Client
- from nats.js import JetStreamContext
- from nats.js.errors import NotFoundError
- from pydantic import ValidationError
- from structlog.contextvars import bind_contextvars
- from Fluent import FluentDispenser, CustomFluentLocalization
- from MessageTransmission import loader_config, exc, models, enums
- from MessageTransmission.const import *
- logger: structlog.BoundLogger = structlog.stdlib.get_logger(__name__)
- class Initialize:
- jetstream: JetStreamContext
- def __init__(self, config: dict[str, Any]) -> None:
- """ Initial """
- logger.debug('Initializing a message processing object')
- self.config = config
- self.bot = Bot(token=config['application']['key'], parse_mode=ParseMode.HTML)
- self.dispenser = FluentDispenser(
- default_language=config['application']['default_language_code']
- )
- async def start_polling(self) -> None:
- """ Connect to NATS server and return the """
- nats_connect: Client = await nats.connect(
- servers=[self.config['nats_url']]
- )
- logger.debug('Successful connection to NATS service')
- self.jetstream: JetStreamContext = nats_connect.jetstream()
- await self.polling()
- @property
- async def subscriber(self):
- """ Subscribe to NATS server """
- return await self.jetstream.subscribe(
- stream=JETSTREAM_NAME,
- subject=JETSTREAM_SUBJECT
- )
- @property
- async def subscriber_builder(self):
- """ Subscribe builder to NATS server. If connection fails, creates a new stream """
- if self.jetstream is None:
- raise ValueError("Jetstream is not initialized")
- try:
- return await self.subscriber
- except NotFoundError:
- await self.jetstream.add_stream(name=JETSTREAM_NAME, subjects=[JETSTREAM_SUBJECT])
- return await self.subscriber
- def type_processing(
- self,
- type_message: enums.TypeMessage,
- data: dict[str, Any]
- ) -> Any:
- """ Processing a message model based on message type """
- match type_message:
- case enums.TypeMessage.connection:
- return models.MessageConnectModel(**data)
- case enums.TypeMessage.no_payment_system:
- return models.MessageNoPaymentSystemModel(**data)
- case _:
- raise exc.UnknownMessageTypeError(type_message)
- async def polling(self) -> None:
- """ Main message processing loop """
- logger.debug('Starting a message processing loop')
- subscriber = await self.subscriber_builder
- async for msg in subscriber.messages:
- message: dict[str, Any] = ormsgpack.unpackb(msg.data)
- try:
- __model_message = models.BaseMessageModel(**message).model_dump()
- metadata = models.MetadataMessageModel(**__model_message['metadata']).model_dump()
- __type: str = metadata['type_msg']
- if __type not in enums.TypeMessage.__members__:
- raise exc.MessageTypeMissingError(__type)
- callback: dict = __model_message['callback']
- model: Any = self.type_processing(
- type_message=enums.TypeMessage[__type],
- data=__model_message['data_message']
- )
- logger.debug(
- 'Processing message',
- uuid=metadata['uuid'].hex,
- type_message=__type,
- metdata=metadata,
- model=model.model_dump(),
- callback=callback
- )
- l10n: CustomFluentLocalization = self.dispenser.get_localization(language_code=metadata['language_code'])
- await self.bot.send_message(
- chat_id=metadata['chat_id'],
- disable_web_page_preview=True,
- **model.answer(l10n=l10n, callback=callback)
- )
- await msg.ack()
- except TimeoutError:
- logger.warning('Timeout error')
- except TelegramRetryAfter as ex:
- logger.warning(f'Limit exceeded, continue in: {ex.retry_after}')
- await asyncio.sleep(float(ex.retry_after))
- continue
- except TelegramForbiddenError:
- logger.warning('User blocked Bot')
- await msg.ack()
- continue
- except exc.MessageTypeMissingError as err:
- logger.error(f"Error extract a message type: {err} Available types: {list(enums.TypeMessage.__members__.keys())}")
- await msg.ack()
- continue
- except exc.UnknownMessageTypeError as err:
- logger.error(f'Unknown message type: {err.message_type}', exc=err)
- await msg.ack()
- continue
- except ValidationError as err:
- for error in err.errors():
- logger.error('Error in model validation', failed_key=error.get('loc'), info=error, message=message)
- await msg.ack()
- continue
- except Exception as err:
- logger.error('Unexpected error', exc=err, message=message)
- await msg.ack()
- continue
- if __name__ == '__main__':
- config: dict[str, Any] = loader_config()
- message_processing: Initialize = Initialize(config=config)
- asyncio.run(message_processing.start_polling())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement