LordRal

IDM API

Apr 26th, 2022
1,109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.68 KB | None | 0 0
  1. import typing
  2.  
  3. import requests
  4.  
  5.  
  6. class IDMException(Exception):
  7.     ...
  8.  
  9.  
  10. class IDMAPIException(IDMException):
  11.  
  12.     def __init__(self, code, detail, **kw):
  13.         self.code = code
  14.         self.detail = detail
  15.         self.kwargs = kw
  16.  
  17.     def __str__(self):
  18.         return f"[{self.code}] {self.detail}"
  19.  
  20.  
  21. class IDMAPICallbackException(IDMException):
  22.     CALLBACK_CODES = {
  23.         1: "Пустой запрос",
  24.         2: "Неизвестный тип сигнала",
  25.         3: "Пара пользователь/секрет не найдены",
  26.         4: "Беседа не привязана",
  27.         5: "Пользователь заблокирован",
  28.         6: "Дежурный не активен",
  29.         7: "Пользователь не активен",
  30.         10: "Не удалось связать беседу"
  31.     }
  32.  
  33.     def __init__(
  34.             self,
  35.             response: str,
  36.             error_code: typing.Optional[int] = None,
  37.             error_message: typing.Optional[str] = None,
  38.             **kw
  39.     ):
  40.         self.error_type = response
  41.         self.error_code = error_code
  42.         self.error_message = error_message
  43.         self.kwargs = kw
  44.  
  45.     def __str__(self):
  46.         if self.error_type == 'error':
  47.             if self.error_code in self.CALLBACK_CODES:
  48.                 return f"[{self.error_code}] {self.CALLBACK_CODES[self.error_code]}"
  49.             elif self.error_message:
  50.                 return f"[{self.error_code}] {self.error_message}"
  51.             else:
  52.                 return f"[{self.error_code}] Нe известная ошибка"
  53.         else:
  54.             if self.error_message:
  55.                 return f"VK [{self.error_code}] {self.error_message}"
  56.             else:
  57.                 return f"VK [{self.error_code}] Нe известная ошибка"
  58.  
  59.  
  60. class IDMAPI:
  61.     """
  62.    api = IDMAPI('https://idmduty.ru', 'IDMLP(id;secret)')
  63.  
  64.    try:
  65.        cfg = api.get_lp_info('token')
  66.        cfg['config']['secret_code']
  67.    except IDMAPIException as ex:
  68.        print(ex)
  69.  
  70.    try:
  71.        api.send_my_signal(..., {"id": event['id'], "is_win_platform": True})
  72.    except IDMAPICallbackException as ex:
  73.        print(ex)
  74.    """
  75.    
  76.     _session: typing.Optional[requests.Session]
  77.  
  78.     def __init__(self, base_domain: str, header: str):
  79.         self.base_domain = base_domain
  80.         self.header_info = header
  81.         self._session = None
  82.  
  83.         self._get_lp_info_link = f"{self.base_domain}/api/dutys/get_lp_info/"
  84.         self._save_lp_info_link = f"{self.base_domain}/api/dutys/save_lp_info/"
  85.         self._callback_link = f"{self.base_domain}/callback/"
  86.  
  87.     @property
  88.     def session(self) -> requests.Session:
  89.         if not self._session:
  90.             self._session = requests.Session()
  91.         return self._session
  92.  
  93.     def get_lp_info(self, access_token: str) -> dict:
  94.         response = self.session.post(
  95.             self._get_lp_info_link,
  96.             json={'access_token': access_token},
  97.             headers={'User-Agent': self.header_info}
  98.         ).json()
  99.         if 'error' in response:
  100.             raise IDMAPIException(**response['error'])
  101.         return response['response']
  102.  
  103.     def send_request(self, url: str, data: dict) -> dict:
  104.         response = self.session.post(url, json=data, headers={'User-Agent': self.header_info})
  105.         response_json = response.json()
  106.         if 'error' in response_json:
  107.             raise IDMAPIException(**response_json['error'])
  108.         if response_json['response'] in ('error', 'vk_error',):
  109.             raise IDMAPICallbackException(**response_json)
  110.         return response_json
  111.  
  112.     async def callback(self, data: dict) -> dict:
  113.         return self.send_request(self._callback_link, data)
  114.  
  115.     async def ping(self):
  116.         return await self.callback({
  117.             "method": "ping",
  118.             "message": {},
  119.             "object": {}
  120.         })
  121.  
  122.     def _send_signal(
  123.             self,
  124.             method: str,
  125.             from_id: int,
  126.             peer_id: int,
  127.             conversation_message_id: int,
  128.             date: int,
  129.             text: str,
  130.             vk_message: typing.Optional[dict] = None
  131.     ):
  132.         return self.callback({
  133.             "method": method,
  134.             "message": {
  135.                 "conversation_message_id": conversation_message_id,
  136.                 "from_id": from_id,
  137.                 "date": date,
  138.                 "text": text,
  139.                 "peer_id": peer_id
  140.             },
  141.             "object": {
  142.                 "chat": None,
  143.                 "from_id": from_id,
  144.                 "value": text,
  145.                 "conversation_message_id": conversation_message_id
  146.             },
  147.             "vkmessage": vk_message
  148.         })
  149.  
  150.     def send_my_signal(
  151.             self,
  152.             from_id: int,
  153.             peer_id: int,
  154.             conversation_message_id: int,
  155.             date: int,
  156.             text: str,
  157.             vk_message: typing.Optional[dict] = None
  158.     ):
  159.         return self._send_signal(
  160.             'lpSendMySignal',
  161.             from_id=from_id, peer_id=peer_id,
  162.             conversation_message_id=conversation_message_id,
  163.             date=date, text=text, vk_message=vk_message
  164.         )
  165.  
  166.     def send_signal(
  167.             self,
  168.             from_id: int,
  169.             peer_id: int,
  170.             conversation_message_id: int,
  171.             date: int,
  172.             text: str,
  173.             vk_message: typing.Optional[dict] = None
  174.     ):
  175.         return self._send_signal(
  176.             'lpSendSignal',
  177.             from_id=from_id, peer_id=peer_id,
  178.             conversation_message_id=conversation_message_id,
  179.             date=date, text=text, vk_message=vk_message
  180.         )
  181.  
Advertisement
Add Comment
Please, Sign In to add comment