Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import logging
- import os
- import re
- import sys
- import json
- from dotenv import load_dotenv
- from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
- from telegram.error import TelegramError
- from telegram.ext import ApplicationBuilder, CallbackContext, CommandHandler, CallbackQueryHandler, MessageHandler, filters
- from spam_tokens import REGULAR_TOKENS, CRITICAL_TOKENS
- TOKEN = os.getenv('ANTISPAM_TOKEN')
- TARGET_CHAT = os.getenv('TARGET_GROUP_ID')
- DEBUG_CHAT = os.getenv('DEBUG_CHAT_ID')
- PRIMARY_ADMIN = os.getenv('PRIMARY_ADMIN')
- BACKUP_ADMIN = os.getenv('BACKUP_ADMIN')
- class DeleteCallbackData:
- def __init__(self, chat_id, message_id, user_id, update_message_id):
- self.ci = chat_id
- self.mi = message_id
- self.ui = user_id
- self.umi = update_message_id
- class ManualEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, DeleteCallbackData):
- return obj.__dict__
- return json.JSONEncoder.default(self, obj)
- class AutoDeleteCallbackData:
- def __init__(self, chat_id, message_id, user_id):
- self.ci = chat_id
- self.mi = message_id
- self.ui = user_id
- class AutoEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, AutoDeleteCallbackData):
- return obj.__dict__
- return json.JSONEncoder.default(self, obj)
- async def handle_new_member(update: Update, context: CallbackContext):
- new_users = update.message.new_chat_members
- for user in new_users:
- db_users.insert({'user_id': user.id, 'date_joined': str(datetime.datetime.now())})
- async def report_manually(update: Update, context: CallbackContext):
- if update.message.reply_to_message:
- reply_to_message = update.message.reply_to_message
- numeric_chat_id = reply_to_message.chat.id
- chat_id = str(numeric_chat_id).replace("-100", "")
- message_id = reply_to_message.message_id
- user = reply_to_message.from_user
- if user.last_name is not None:
- user_display_name = f"{user.first_name} {user.last_name}"
- elif user.last_name is None:
- user_display_name = f"{user.first_name}"
- user_link = f"https://t.me/{user.username}"
- link = f"https://t.me/c/{chat_id}/{message_id}"
- callback_data = DeleteCallbackData(chat_id, message_id, user.id, update.message.message_id)
- callback_data_serialized = json.dumps(callback_data, cls=ManualEncoder)
- keyboard = InlineKeyboardMarkup([
- [InlineKeyboardButton("Удалить", callback_data=callback_data_serialized)]
- ])
- # some variables
- if reply_to_message.text is not None:
- # some logic like the following
- # await context.bot.send_message(chat_id=TARGET_CHAT, text=text_message_content,
- # disable_web_page_preview=True,parse_mode="HTML",reply_markup=reply_markup)
- else:
- return
- async def button_delete(update: Update, context: CallbackContext):
- query = update.callback_query
- await query.answer()
- data_string = query.data
- callback_data = json.loads(data_string)
- ci = callback_data.get('ci', 'DefaultCI')
- mi = callback_data.get('mi', 0)
- ui = callback_data.get('ui', 0)
- umi = callback_data.get('umi', 0)
- message_id = mi
- command_id = umi
- chat_id_temp = ci
- chat_id=f"-100{chat_id_temp}"
- try:
- # Attempt to delete message
- await context.bot.delete_message(chat_id=chat_id, message_id=command_id)
- await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
- user_id = ui
- # Attempt to ban the chat member
- await context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
- user = query.from_user
- if user.last_name is not None:
- user_display_name = f"{user.first_name} {user.last_name}"
- elif user.last_name is None:
- user_display_name = f"{user.first_name}"
- user_link = f"https://t.me/{user.username}"
- await query.message.reply_html(f"<a href='{user_link}'><b>{user_display_name}</b></a> забанил пользователя с ID {user_id}", disable_web_page_preview=True)
- await query.edit_message_reply_markup(None)
- except TelegramError as e:
- # Handle error, send a custom message to the user if an error occurs
- error_message = f"Возникла ошибка: {str(e)}"
- await query.message.reply_html(error_message, disable_web_page_preview=True)
- await query.edit_message_reply_markup(None)
- async def check_automatically(update: Update, context: CallbackContext):
- message = update.message
- numeric_chat_id = message.chat.id
- chat_id = str(numeric_chat_id).replace("-100", "")
- message_id = message.message_id
- user = message.from_user
- if user.last_name is not None:
- user_display_name = f"{user.first_name} {user.last_name}"
- elif user.last_name is None:
- user_display_name = f"{user.first_name}"
- user_link = f"https://t.me/{user.username}"
- link = f"https://t.me/c/{chat_id}/{message_id}"
- if message.text is not None:
- words = message.text
- elif message.text is None:
- words = message.caption
- reg_pattern = '|'.join(map(re.escape, REGULAR_TOKENS))
- crit_pattern = '|'.join(map(re.escape, CRITICAL_TOKENS))
- regular_patterns = re.findall(reg_pattern, words)
- num_regular = len(regular_patterns)
- critical_patterns = re.findall(crit_pattern, words)
- num_critical = len(critical_patterns)
- if num_critical > 0 or num_regular > 2:
- verdict = f"<b>Критические токены:</b> {num_regular}\n<b>Обычные токены:</b> {num_critical}"
- auto_callback_data = AutoDeleteCallbackData(chat_id, message_id, user.id)
- auto_callback_data_serialized = json.dumps(auto_callback_data, cls=AutoEncoder)
- keyboard = [
- [InlineKeyboardButton("Удалить", callback_data=auto_callback_data_serialized),
- InlineKeyboardButton("Игнорировать", callback_data='Declined')]
- ]
- reply_markup = InlineKeyboardMarkup(keyboard)
- # some logic like the following
- # await context.bot.send_message(chat_id=TARGET_CHAT, text=text_message_content,
- # disable_web_page_preview=True,parse_mode="HTML",reply_markup=reply_markup)
- async def auto_button_delete(update: Update, context: CallbackContext):
- query = update.callback_query
- await query.answer()
- data_string = query.data
- callback_data = json.loads(data_string)
- ci = callback_data.get('ci', 'DefaultCI')
- mi = callback_data.get('mi', 0)
- ui = callback_data.get('ui', 0)
- umi = callback_data.get('umi', 0)
- message_id = mi
- chat_id_temp = ci
- chat_id=f"-100{chat_id_temp}"
- try:
- # Attempt to delete message
- await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
- user_id = ui
- # Attempt to ban the chat member
- await context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
- user = query.from_user
- if user.last_name is not None:
- user_display_name = f"{user.first_name} {user.last_name}"
- elif user.last_name is None:
- user_display_name = f"{user.first_name}"
- user_link = f"https://t.me/{user.username}"
- await query.message.reply_html(f"<a href='{user_link}'><b>{user_display_name}</b></a> забанил пользователя с ID {user_id}", disable_web_page_preview=True)
- await query.edit_message_reply_markup(None)
- except TelegramError as e:
- # Handle error, send a custom message to the user if an error occurs
- error_message = f"Возникла ошибка: {str(e)}"
- await query.message.reply_html(error_message, disable_web_page_preview=True)
- await query.edit_message_reply_markup(None)
- async def auto_ignore_button(update: Update, context: CallbackContext):
- query = update.callback_query
- await query.answer()
- try:
- await query.edit_message_reply_markup(None)
- except TelegramError as e:
- # Handle error, send a custom message to the user if an error occurs
- error_message = f"Возникла ошибка: {str(e)}"
- await query.message.reply_html(error_message, disable_web_page_preview=True)
- await query.edit_message_reply_markup(None)
- def main():
- print("I'm working")
- application = ApplicationBuilder().token(TOKEN).arbitrary_callback_data(True).build()
- application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, handle_new_member))
- #application.add_handler(CallbackQueryHandler(auto_button_delete, pattern="{chat_id}, {message_id}, {user.id}"))
- application.add_handler(CallbackQueryHandler(auto_ignore_button, pattern="Declined"))
- application.add_handler(CallbackQueryHandler(button_delete, pattern=))
- #application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, check_automatically))
- application.add_handler(CommandHandler("ban", report_manually))
- application.run_polling()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement