Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- import json
- import logging
- import os
- import random
- import time
- import traceback
- from collections import defaultdict
- from typing import List
- import requests
- import vk_api
- from django.conf import settings
- from vk_api.bot_longpoll import VkBotEventType, VkBotLongPoll, VkBotMessageEvent
- from api.models import VkAccount, VkChat
- from NetworkDiary import exceptions
- from vk_bot.Bot.logging import logs_init
- from vk_bot.Bot.objects.api_manager import ApiManager
- from vk_bot.Bot.objects.event_manager import EventManager, ThreadWithKill
- from vk_bot.Bot.objects.keyboard import KeyBoard
- from vk_bot.Bot.objects.navigation_user import NavigatingUser
- class VkBot:
- def __init__(self, worker_timeout: int, max_events: int, service_bot=None):
- logs_init('bot.log', 'INFO')
- self.__vk_session__ = vk_api.VkApi(token=settings.VK_BOT_TOKEN)
- self.long_poll = VkBotLongPoll(self.__vk_session__, settings.VK_GROUP_ID)
- self.api = self.__vk_session__.get_api()
- self.__pages__ = {}
- self.__commands__ = {}
- self.__path_ts__ = os.path.join('.dm', 'ts.log')
- self.__ts__ = self.get_ts()
- self.__main_threads__ = []
- self.users = defaultdict()
- self.api_manager = ApiManager()
- self.event_manager = EventManager(worker_timeout, max_events)
- self.worker_timeout = worker_timeout
- self.max_events = max_events
- self.status = False
- self.service_tg_bot = service_bot
- logging.info('vk_bot activated')
- def send_message(self, user_id: int or str,
- text: str = None,
- keyboard: KeyBoard = None,
- user_ids: List = None,
- peer_id: int or str = None,
- lat: float = None, lon: float = None,
- attachment: List = None,
- forward_messages: List = None,
- sticker_id: int = None,
- dont_parse_links: bool = False,
- back=False):
- return self.api_manager.method(
- 'messages.send',
- user_id=None if peer_id else user_id,
- peer_id=peer_id if peer_id else user_id,
- user_ids=','.join(map(str, user_ids)) if user_ids else None,
- message=text,
- lat=lat, lon=lon,
- attachment=','.join(map(str, attachment)) if attachment else None,
- forward_messages=','.join(
- map(str, forward_messages)) if forward_messages else None,
- sticker_id=sticker_id,
- random_id=random.randint(10 ** 8, 10 ** 10),
- group_id=settings.VK_GROUP_ID,
- keyboard=keyboard.get() if keyboard else None,
- dont_parse_links=int(dont_parse_links),
- back=back
- )
- def edit_message(self, user_id: int or str,
- message_id: int,
- text: str = None,
- keyboard: KeyBoard = None,
- lat: float = None, lon: float = None,
- attachment: List = None,
- forward_messages: List = None,
- sticker_id: int = None):
- return self.api_manager.method(
- 'messages.edit',
- peer_id=user_id,
- message_id=message_id,
- message=text,
- lat=lat, lon=lon,
- attachment=','.join(map(str, attachment)) if attachment else None,
- forward_messages=','.join(map(str, forward_messages)) if forward_messages else None,
- sticker_id=sticker_id,
- random_id=random.randint(10 ** 8, 10 ** 10),
- group_id=settings.VK_GROUP_ID,
- keyboard=keyboard.get() if keyboard else None
- )
- def get_ts(self):
- try:
- with open(self.__path_ts__) as file:
- return int(file.read())
- except (BaseException, Exception):
- pass
- def set_ts(self):
- while True:
- try:
- with open(self.__path_ts__, 'w') as file:
- file.write(str(self.__ts__))
- break
- except (BaseException, Exception):
- pass
- def upload_photo(self, path: str, peer_id: int):
- server = self.api_manager.method('photos.getMessagesUploadServer', peer_id=peer_id, back=True)
- data = requests.post(server['upload_url'], files={'photo': open(path, 'rb')}).json()
- photo = self.api_manager.method(
- 'photos.saveMessagesPhoto',
- photo=data['photo'],
- server=data['server'],
- hash=data['hash'],
- back=True
- )[0]
- return 'photo{}_{}'.format(photo['owner_id'], photo['id'])
- def upload_doc(self, path: str, peer_id: int):
- server = self.api_manager.method('docs.getMessagesUploadServer', peer_id=peer_id, back=True)
- data = requests.post(server['upload_url'], files={'file': open(path, 'rb')}).json()
- doc = self.api_manager.method(
- 'docs.save',
- file=data['file'],
- title=path.split('/')[-1],
- back=True
- )['doc']
- return 'doc{}_{}'.format(doc['owner_id'], doc['id'])
- @staticmethod
- def verify_chat(peer_id: int):
- VkChat.objects.get_or_create(peer_id=peer_id)
- def verify_user(self, user_id: int):
- vk_account, _ = VkAccount.objects.get_or_create(user_id=user_id)
- if user_id in self.users:
- return self.users[user_id]
- user = NavigatingUser(user_id, vk_account=vk_account, user=vk_account.active_user)
- self.users[user_id] = user
- return user
- def user_message(self, user: NavigatingUser, event: VkBotMessageEvent, status):
- raw_object = event.raw['object']
- if type(raw_object.get('payload')) == str:
- payload = raw_object.get('payload')
- payload = json.loads(payload) if payload else {'button': None}
- raw_object['payload'] = payload
- if status == 'message':
- self.__pages__[user.page[0]](user, raw_object)
- elif status == 'command':
- command = raw_object['text'].split()[0]
- if command in self.__commands__:
- self.__commands__[command](user, raw_object)
- else:
- self.send_message(user.user_id, "Неизвестная команда 😄")
- else:
- raise TypeError('Unknown status: "{}"'.format(status))
- def chat_message(self, user: NavigatingUser, event: VkBotMessageEvent):
- chat_handler = getattr(self, 'chat_handler', None)
- chat_handler(user, event.raw['object'])
- def event_processing(self):
- while self.status:
- self.event_manager.work(self.event_handler)
- time.sleep(0.05)
- def api_processing(self):
- while self.status:
- self.api_manager.send(self.api)
- time.sleep(0.05)
- def start_polling(self):
- while self.status:
- self.long_poll.ts = self.__ts__ if self.__ts__ is not None else self.long_poll.ts
- logging.info('Long Polling Server starting...')
- for event in self.long_poll.listen():
- self.__ts__ = self.long_poll.ts
- self.event_manager.add_event(event)
- def error_handler(self, func):
- while self.status:
- try:
- func()
- except KeyboardInterrupt:
- print('Exit...')
- self.quit()
- except (BaseException, Exception):
- print(traceback.format_exc())
- if self.service_tg_bot is not None:
- msg = 'VK BOT\n\n{}'.format(traceback.format_exc())
- self.service_tg_bot.send_message(settings.TG_ADMIN_ID, msg)
- def run(self):
- logging.info('Start polling...')
- self.status = True
- self.__main_threads__.append(
- ThreadWithKill(target=self.error_handler, args=[self.start_polling], daemon=True)
- )
- self.__main_threads__.append(
- ThreadWithKill(target=self.error_handler, args=[self.event_processing], daemon=True)
- )
- self.__main_threads__.append(
- ThreadWithKill(target=self.error_handler, args=[self.api_processing], daemon=True)
- )
- for thread in self.__main_threads__:
- thread.start()
- def quit(self):
- self.status = False
- for thread in self.__main_threads__:
- thread.kill()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement