Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import re
- from typing import Callable
- from database import Servidor, Dono, db, db_session
- from telebot import TeleBot, types
- from telebot.util import extract_arguments, is_command
- from telebot.apihelper import ApiException
- from utils import extract_auth_host, extract_ip
- db.bind('sqlite', 'ehibot.sqlite3', create_db=True)
- db.generate_mapping(create_tables=True)
- TOKEN = "339048941:AAFvc99VmZSSYSKhtj9HSVW6clKuVfliib4" # type: str
- bot = TeleBot(TOKEN)
- servidoresSalvos = {} # type: dict
- LIMITE_DE_SERVIDORES = 5 # type: int
- LIMITE_DIAS_PADRAO = 2 # type: int
- USER_DATA = {} #type: dict
- logger = logging.getLogger('servidor')
- @bot.message_handler(commands=['novo', 'create'])
- def new_server(msg):
- user_id = msg.from_user.id
- auths = extract_arguments(msg.text)
- auths = extract_auth_host(auths)
- if msg.chat.type != 'private':
- bot.reply_to(msg, 'Para garantir a segurança do seu servidor, precisamos falar secretamente!')
- elif exceed_limit_servers(user_id):
- bot.send_message(msg.chat.id, 'Máximo de %d servidores.' %LIMITE_DE_SERVIDORES)
- elif auths:
- username, password, ip = auths['username'], auths['password'], auths['ip']
- create_server(username, password, ip, user_id)
- bot.send_message(msg.chat.id, 'Servidor criado com sucesso!')
- else:
- bot.send_message(msg.chat.id, 'Tente username:senha@ip')
- def exceed_limit_servers(user_id: int) -> bool:
- with db_session:
- if LIMITE_DE_SERVIDORES <= Servidor.select(lambda s: s.dono.uid == user_id).count():
- return True
- def create_server(username: str, password: str, ip: str, user_id: int) -> None:
- with db_session:
- new_owner(user_id)
- owner = Dono[user_id]
- Servidor(
- ip=ip,
- userssh=username,
- senha=password,
- grupoid=user_id,
- dono=owner,
- limitedias=LIMITE_DIAS_PADRAO
- )
- def new_owner(user_id: int) -> None:
- with db_session:
- if not Dono.exists(uid=user_id):
- Dono(uid=user_id)
- @bot.message_handler(regexp="^(/editar )[0-9]{1,4}$")
- def update_server(msg):
- if msg.chat.type != 'private':
- bot.reply_to(msg.chat.id, 'Precisamos conversar sozinhos para isso.')
- return
- user_id = msg.chat.id
- server_id = extract_arguments(msg.text)
- if not owner_exists(user_id):
- bot.send_message(msg.chat.id, 'Você não possui um cadastro de dono.')
- elif not server_have_owner(server_id, user_id):
- bot.send_message(msg.chat.id, 'Servidor não encontrado!')
- else:
- markup = create_options_button_server(user_id, server_id)
- bot.send_message(msg.chat.id, "Selecione uma opção:", reply_markup=markup)
- def owner_exists(user_id: int) -> bool:
- with db_session: return Dono.exists(uid=user_id)
- def server_have_owner(server_id: int, user_id: int) -> bool:
- with db_session:
- return Servidor.exists(id=server_id, dono=Dono.get(uid=user_id))
- def create_options_button_server(user_id: int, server_id: int) -> Callable[[list, str], types.InlineKeyboardMarkup]:
- texts = ['🖥 IP', '👤 UserSSH', '🔑 Senha', '💬 Titulo', '🕐 LimiteDias', '✘ Cancelar']
- data = str(server_id)
- return create_inline_buttons(texts, data)
- def create_inline_buttons(texts: list, data: str) -> types.InlineKeyboardMarkup:
- markup = types.InlineKeyboardMarkup(row_width=2)
- buttons = [] #do row_width work
- for text in texts:
- #ex. data: titulo15 (tituloID)
- button = types.InlineKeyboardButton(text, callback_data=text.lower()[2:]+data)
- buttons.append(button)
- markup.add(*buttons)
- return markup
- @bot.callback_query_handler(func=lambda call: call.data.startswith('cancelar'))
- def callback_inline_cancel(call):
- delete_message(call.message.chat.id, call.message.message_id)
- @bot.callback_query_handler(func=lambda call: call.data.startswith('ip'))
- def callback_inline_ip(call):
- user_id = call.from_user.id
- server_id = int(call.data.strip('ip'))
- USER_DATA[user_id] = server_id
- bot.send_message(call.message.chat.id, "Envie o endereço IP")
- delete_message(call.message.chat.id, call.message.message_id)
- bot.register_next_step_handler(call.message, step_edit_server_ip)
- def step_edit_server_ip(message):
- if is_command(message.text): return
- delete_message(message.chat.id, message.message_id - 1)
- ip = extract_ip(message.text)
- if not ip:
- bot.send_message(message.chat.id, 'Endereço IP inválido, envie outro:')
- bot.register_next_step_handler(message, step_edit_server_ip)
- return
- server_id = USER_DATA[message.from_user.id]
- with db_session:
- server = Servidor[server_id]
- server.ip = ip
- bot.send_message(message.chat.id, 'Endereço IP atualizado do servidor %d' %USER_DATA[message.from_user.id])
- markup = create_options_button_server(message.from_user.id, USER_DATA[message.from_user.id])
- bot.send_message(message.chat.id, "Selecione uma opção", reply_markup=markup)
- def edit_message(text: str, chat_id: int, message_id: int, markup=None) -> bool:
- try:
- bot.edit_message_text(text, chat_id, message_id, reply_markup=markup)
- return True
- except ApiException:
- return
- def delete_message(chat_id: int, message_id: int) -> bool:
- try:
- bot.delete_message(chat_id, message_id)
- return True
- except ApiException:
- return
- @bot.message_handler(commands=['servidores'])
- def show_info_servers(msg):
- user_id = msg.from_user.id
- chat_id = msg.chat.id
- banner = """
- 📝Titulo: {0}\n
- 🖥IP: {1}
- 🆔ID: {2}\n
- 👤Usuário: {3}
- 🔑Senha: {4}
- 🔘️Status: {5}
- """
- with db_session:
- for server in Servidor.select(lambda s: s.dono.uid == user_id)[:]:
- titulo = server.titulo
- ip = server.ip
- idservidor = server.id
- userssh = server.userssh
- senha = '*****'
- online = str(server.online).replace('False', 'offline').replace('True', 'online')
- bot.send_message(chat_id, banner.format(titulo, ip, idservidor, userssh, senha, online))
- @bot.message_handler(commands=['start'])
- def welcome(msg):
- message = """\
- Eu sou um bot que registra servidores
- no meu banco de dados para que depois
- você possa utilizar esse servidor em seu grupo
- atráves de comandos como /cadastrar etc...
- Para começar me envie as informações do seu host
- Exemplo: username:senha@ip
- Ficaria: alberto:senha123@179.242.73.20"""
- @bot.message_handler(commands=['ajuda', 'help'])
- def send_help(msg):
- message = """<b>✅ Obrigatório</b>
- <b>Precisamos que você insira no seu servidor o
- usuário que você está nos passando no grupo sudo,
- para que seja possível cadastrar usuários no seu servidor</b>
- <b>Tente:</b> <code>sudo usermod -aG sudo nomeuser</code>
- <b>Caso falhe tente linha por linha abaixo:</b>
- <pre>su -
- apt-get install sudo
- nusermod -aG sudo nomeuser
- </pre>
- /atualizar - atualizar um servidor
- /novo - adicionar novo servidor
- /ajuda - imprime essa mensagem
- /servidores - servidores cadastrados
- """
- bot.send_message(msg.chat.id, message, parse_mode='HTML')
- if __name__ == '__main__':
- FORMAT=" %(funcName)s || %(message)s "
- logging.basicConfig(level=logging.DEBUG, format=FORMAT)
- print('Executando bot servidor..')
- bot.skip_pending=True
- bot.polling(none_stop=True)
Add Comment
Please, Sign In to add comment