Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import datetime
- import logging
- import os
- import re
- import time
- from os.path import dirname, join
- import aiogram
- from aiogram import Bot, Dispatcher, executor, filters, types
- from aiogram.contrib.fsm_storage.memory import MemoryStorage
- from aiogram.contrib.middlewares.logging import LoggingMiddleware
- from aiogram.dispatcher.filters import BoundFilter
- from aiogram.dispatcher.filters.state import State, StatesGroup
- from aiogram.utils.exceptions import Throttled
- from dotenv import load_dotenv
- import api
- import buttons
- import utils
- logging.basicConfig(level=logging.INFO)
- loop = asyncio.get_event_loop()
- dotenv_path = join(dirname(__file__), '.env')
- load_dotenv(dotenv_path)
- storage = MemoryStorage()
- bot = Bot(token=os.getenv('TOKEN'), parse_mode=types.ParseMode.HTML)
- dp = Dispatcher(bot, storage=storage)
- dp.middleware.setup(LoggingMiddleware())
- class MyFilter(BoundFilter):
- key = 'is_admin'
- def __init__(self, is_admin):
- self.is_admin = is_admin
- async def check(self, c: types.CallbackQuery):
- print(c.data)
- return c.data.startswith('bot')
- dp.filters_factory.bind(MyFilter)
- class User(StatesGroup):
- main = State() # Will be represented in storage as 'Form:name'
- token = State() # Will be represented in storage as 'Form:age'
- @dp.callback_query_handler(state='*', is_admin=True)
- async def get_stats(c):
- print(1)
- parts = buttons.bot_btn.parse(c.data)
- print(parts)
- tmp_token = parts['token']
- try:
- tmp_bot = Bot(token=tmp_token)
- webhook_info = await tmp_bot.get_webhook_info()
- bot_info = await tmp_bot.get_me()
- db_info = await api.Users.get_bot_info(c.from_user.id, tmp_token)
- if db_info:
- await bot.edit_message_text(
- chat_id=c.message.chat.id,
- message_id=c.message.message_id,
- text=f'Информация о боте @{bot_info.username} (#ID{bot_info.id})\nСообщений в очереди: <b>{webhook_info.pending_update_count}</b>\nУстановленный предел: <b>{db_info["requests_limit"]}</b>'
- )
- else:
- await api.Service.del_bot(tmp_token)
- await bot.edit_message_reply_markup(
- chat_id=c.message.chat.id,
- message_id=c.message.message_id,
- reply_markup=buttons.Menu.bots_list(c.from_user.id)
- )
- except aiogram.exceptions.Unauthorized:
- await api.Service.del_bot(tmp_token)
- await bot.edit_message_reply_markup(
- chat_id=c.message.chat.id,
- message_id=c.message.message_id,
- reply_markup=buttons.Menu.bots_list(c.from_user.id)
- )
- except Exception as e:
- logging.error(e)
- finally:
- await tmp_bot.close()
- @dp.message_handler(lambda msg: msg.text.lower() == '❌в начало❌', state=User.token)
- @dp.message_handler(commands=['cancel'], state=User.token)
- @dp.message_handler(commands=['start'])
- async def start(msg):
- try:
- await dp.throttle('{}:{}'.format(msg.chat.id, msg.text), rate=1)
- except Throttled:
- pass
- else:
- if not await api.Users.is_new(msg.from_user.id):
- try:
- await api.Users.register(msg.from_user.id, msg.from_user.first_name)
- except Exception as e:
- print(e)
- await bot.send_message(
- msg.chat.id,
- 'Главное меню',
- reply_markup=await buttons.Menu.main()
- )
- await User.main.set()
- @dp.message_handler(lambda msg: msg.text.lower() == '❓помощь❓', state=User.main)
- async def help(msg):
- try:
- await dp.throttle('{}:{}'.format(msg.chat.id, msg.text), rate=1)
- except Throttled:
- pass
- else:
- await bot.send_message(
- msg.chat.id,
- 'Используйте кнопку <b>📌Добавить нового бота📌</b>, чтобы добавить бота в мониторинг. Сразу после этого начнется ежеминутная проверка состояния вашего бота. \nВ случае превышения установленного лимита запросов в очереди (по умолчанию 100), ваш бот напишет вам в личные сообщения об этом.'
- )
- @dp.message_handler(lambda msg: msg.text.lower() == '📌добавить нового бота📌', state=User.main)
- async def bot_cmd_new(msg):
- try:
- await dp.throttle('{}:{}'.format(msg.chat.id, msg.text), rate=1)
- except Throttled:
- pass
- else:
- await bot.send_message(
- msg.chat.id,
- 'Введите токен бота или перешлите сообщение от @BotFather с токеном.',
- reply_markup=await buttons.Menu.cancel()
- )
- await User.token.set()
- @dp.message_handler(lambda msg: msg.text.lower() == '🤖мои боты🤖', state=User.main)
- async def my_bots(msg):
- try:
- await dp.throttle('{}:{}'.format(msg.chat.id, msg.text), rate=1)
- except Throttled:
- pass
- else:
- if await api.Users.count_bots(msg.from_user.id) != 0:
- await bot.send_message(
- msg.chat.id,
- 'Список ваших ботов',
- reply_markup=await buttons.Menu.bots_list(msg.from_user.id)
- )
- else:
- await bot.send_message(
- msg.chat.id,
- 'Вы не добавили ни одного бота. \nИспользуйте кнопку <i>📌Добавить нового бота📌</i>'
- )
- @dp.message_handler(state=User.token)
- async def check_token(msg):
- try:
- await dp.throttle('{}:{}'.format(msg.chat.id, msg.text), rate=1)
- except Throttled:
- pass
- else:
- token_raw = re.search('(\d{0,9}\:[a-zA-Z0-9_\-]{35})', msg.text)
- if token_raw:
- token = token_raw.group(0)
- checking_result = await utils.get_info(token=token, method='getMe')
- else:
- checking_result = False
- if checking_result:
- await bot.send_message(
- msg.chat.id,
- f'Бот @{checking_result["username"]} прошел проверку и был добавлен в ваш список',
- reply_markup=await buttons.Menu.main()
- )
- await api.Users.new_bot(msg.from_user.id, token, checking_result)
- await User.main.set()
- else:
- await bot.send_message(
- msg.chat.id,
- f'Токен не прошел проверку или не был найден. Введите правильный токен или перешлите сообщение от @BotFather с токеном.',
- reply_markup=await buttons.Menu.cancel()
- )
- if __name__ == '__main__':
- executor.start_polling(dp, loop=loop, skip_updates=True)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement