Advertisement
AlexJameson

Untitled

May 19th, 2024 (edited)
663
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.25 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. import logging
  3. import os
  4. import re
  5. import sys
  6. import json
  7. from dotenv import load_dotenv
  8. from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
  9. from telegram.error import TelegramError
  10. from telegram.ext import ApplicationBuilder, CallbackContext, CommandHandler, CallbackQueryHandler, MessageHandler, filters
  11. from spam_tokens import REGULAR_TOKENS, CRITICAL_TOKENS
  12.  
  13. TOKEN = os.getenv('ANTISPAM_TOKEN')
  14. TARGET_CHAT = os.getenv('TARGET_GROUP_ID')
  15. DEBUG_CHAT = os.getenv('DEBUG_CHAT_ID')
  16. PRIMARY_ADMIN = os.getenv('PRIMARY_ADMIN')
  17. BACKUP_ADMIN = os.getenv('BACKUP_ADMIN')
  18.  
  19. class DeleteCallbackData:
  20.     def __init__(self, chat_id, message_id, user_id, update_message_id):
  21.         self.ci = chat_id
  22.         self.mi = message_id
  23.         self.ui = user_id
  24.         self.umi = update_message_id
  25.  
  26. class ManualEncoder(json.JSONEncoder):
  27.     def default(self, obj):
  28.         if isinstance(obj, DeleteCallbackData):
  29.             return obj.__dict__
  30.         return json.JSONEncoder.default(self, obj)
  31.  
  32. class AutoDeleteCallbackData:
  33.     def __init__(self, chat_id, message_id, user_id):
  34.         self.ci = chat_id
  35.         self.mi = message_id
  36.         self.ui = user_id
  37.  
  38. class AutoEncoder(json.JSONEncoder):
  39.     def default(self, obj):
  40.         if isinstance(obj, AutoDeleteCallbackData):
  41.             return obj.__dict__
  42.         return json.JSONEncoder.default(self, obj)
  43.  
  44. async def handle_new_member(update: Update, context: CallbackContext):
  45.     new_users = update.message.new_chat_members
  46.     for user in new_users:
  47.        db_users.insert({'user_id': user.id, 'date_joined': str(datetime.datetime.now())})
  48.  
  49. async def report_manually(update: Update, context: CallbackContext):  
  50.     if update.message.reply_to_message:
  51.         reply_to_message = update.message.reply_to_message
  52.         numeric_chat_id = reply_to_message.chat.id
  53.         chat_id = str(numeric_chat_id).replace("-100", "")
  54.         message_id = reply_to_message.message_id
  55.         user = reply_to_message.from_user
  56.         if user.last_name is not None:
  57.             user_display_name = f"{user.first_name} {user.last_name}"
  58.         elif user.last_name is None:
  59.             user_display_name = f"{user.first_name}"
  60.         user_link = f"https://t.me/{user.username}"
  61.         link = f"https://t.me/c/{chat_id}/{message_id}"
  62.         callback_data = DeleteCallbackData(chat_id, message_id, user.id, update.message.message_id)
  63.         callback_data_serialized = json.dumps(callback_data, cls=ManualEncoder)
  64.         keyboard = InlineKeyboardMarkup([
  65.             [InlineKeyboardButton("Удалить", callback_data=callback_data_serialized)]
  66.         ])
  67.  
  68.         # some variables
  69.  
  70.         if reply_to_message.text is not None:
  71.             # some logic like the following
  72.             # await context.bot.send_message(chat_id=TARGET_CHAT, text=text_message_content,
  73.             # disable_web_page_preview=True,parse_mode="HTML",reply_markup=reply_markup)
  74.     else:
  75.         return
  76.  
  77. async def button_delete(update: Update, context: CallbackContext):
  78.     query = update.callback_query
  79.     await query.answer()
  80.     data_string = query.data
  81.     callback_data = json.loads(data_string)
  82.     ci = callback_data.get('ci', 'DefaultCI')
  83.     mi = callback_data.get('mi', 0)
  84.     ui = callback_data.get('ui', 0)
  85.     umi = callback_data.get('umi', 0)
  86.  
  87.     message_id = mi
  88.     command_id = umi
  89.     chat_id_temp = ci
  90.     chat_id=f"-100{chat_id_temp}"
  91.     try:
  92.         # Attempt to delete message
  93.         await context.bot.delete_message(chat_id=chat_id, message_id=command_id)
  94.         await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
  95.        
  96.         user_id = ui
  97.         # Attempt to ban the chat member
  98.         await context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
  99.  
  100.         user = query.from_user
  101.         if user.last_name is not None:
  102.             user_display_name = f"{user.first_name} {user.last_name}"
  103.         elif user.last_name is None:
  104.             user_display_name = f"{user.first_name}"
  105.         user_link = f"https://t.me/{user.username}"
  106.         await query.message.reply_html(f"<a href='{user_link}'><b>{user_display_name}</b></a> забанил пользователя с ID {user_id}", disable_web_page_preview=True)
  107.         await query.edit_message_reply_markup(None)
  108.  
  109.     except TelegramError as e:
  110.         # Handle error, send a custom message to the user if an error occurs
  111.         error_message = f"Возникла ошибка: {str(e)}"
  112.         await query.message.reply_html(error_message, disable_web_page_preview=True)
  113.         await query.edit_message_reply_markup(None)
  114.  
  115. async def check_automatically(update: Update, context: CallbackContext):
  116.     message = update.message
  117.  
  118.     numeric_chat_id = message.chat.id
  119.     chat_id = str(numeric_chat_id).replace("-100", "")
  120.     message_id = message.message_id
  121.     user = message.from_user
  122.     if user.last_name is not None:
  123.         user_display_name = f"{user.first_name} {user.last_name}"
  124.     elif user.last_name is None:
  125.         user_display_name = f"{user.first_name}"
  126.     user_link = f"https://t.me/{user.username}"
  127.     link = f"https://t.me/c/{chat_id}/{message_id}"
  128.     if message.text is not None:
  129.         words = message.text
  130.            
  131.     elif message.text is None:
  132.         words = message.caption
  133.        
  134.     reg_pattern = '|'.join(map(re.escape, REGULAR_TOKENS))
  135.     crit_pattern = '|'.join(map(re.escape, CRITICAL_TOKENS))
  136.     regular_patterns = re.findall(reg_pattern, words)
  137.     num_regular = len(regular_patterns)
  138.     critical_patterns = re.findall(crit_pattern, words)
  139.     num_critical = len(critical_patterns)
  140.     if num_critical > 0 or num_regular > 2:
  141.         verdict = f"<b>Критические токены:</b> {num_regular}\n<b>Обычные токены:</b> {num_critical}"
  142.         auto_callback_data = AutoDeleteCallbackData(chat_id, message_id, user.id)
  143.         auto_callback_data_serialized = json.dumps(auto_callback_data, cls=AutoEncoder)
  144.         keyboard = [
  145.             [InlineKeyboardButton("Удалить", callback_data=auto_callback_data_serialized),
  146.              InlineKeyboardButton("Игнорировать", callback_data='Declined')]
  147.             ]
  148.         reply_markup = InlineKeyboardMarkup(keyboard)
  149.  
  150.             # some logic like the following
  151.             # await context.bot.send_message(chat_id=TARGET_CHAT, text=text_message_content,
  152.             # disable_web_page_preview=True,parse_mode="HTML",reply_markup=reply_markup)
  153.  
  154. async def auto_button_delete(update: Update, context: CallbackContext):
  155.     query = update.callback_query
  156.     await query.answer()
  157.     data_string = query.data
  158.     callback_data = json.loads(data_string)
  159.     ci = callback_data.get('ci', 'DefaultCI')
  160.     mi = callback_data.get('mi', 0)
  161.     ui = callback_data.get('ui', 0)
  162.     umi = callback_data.get('umi', 0)
  163.  
  164.     message_id = mi
  165.     chat_id_temp = ci
  166.     chat_id=f"-100{chat_id_temp}"
  167.     try:
  168.         # Attempt to delete message
  169.         await context.bot.delete_message(chat_id=chat_id, message_id=message_id)
  170.        
  171.         user_id = ui
  172.         # Attempt to ban the chat member
  173.         await context.bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
  174.  
  175.         user = query.from_user
  176.         if user.last_name is not None:
  177.             user_display_name = f"{user.first_name} {user.last_name}"
  178.         elif user.last_name is None:
  179.             user_display_name = f"{user.first_name}"
  180.         user_link = f"https://t.me/{user.username}"
  181.         await query.message.reply_html(f"<a href='{user_link}'><b>{user_display_name}</b></a> забанил пользователя с ID {user_id}", disable_web_page_preview=True)
  182.         await query.edit_message_reply_markup(None)
  183.  
  184.     except TelegramError as e:
  185.         # Handle error, send a custom message to the user if an error occurs
  186.         error_message = f"Возникла ошибка: {str(e)}"
  187.         await query.message.reply_html(error_message, disable_web_page_preview=True)
  188.         await query.edit_message_reply_markup(None)
  189.  
  190. async def auto_ignore_button(update: Update, context: CallbackContext):
  191.     query = update.callback_query
  192.     await query.answer()
  193.    
  194.     try:
  195.         await query.edit_message_reply_markup(None)
  196.  
  197.     except TelegramError as e:
  198.         # Handle error, send a custom message to the user if an error occurs
  199.         error_message = f"Возникла ошибка: {str(e)}"
  200.         await query.message.reply_html(error_message, disable_web_page_preview=True)
  201.         await query.edit_message_reply_markup(None)
  202.  
  203. def main():
  204.     print("I'm working")
  205.  
  206.     application = ApplicationBuilder().token(TOKEN).arbitrary_callback_data(True).build()
  207.     application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, handle_new_member))
  208.     #application.add_handler(CallbackQueryHandler(auto_button_delete, pattern="{chat_id}, {message_id}, {user.id}"))
  209.     application.add_handler(CallbackQueryHandler(auto_ignore_button, pattern="Declined"))
  210.     application.add_handler(CallbackQueryHandler(button_delete, pattern=))
  211.     #application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, check_automatically))
  212.     application.add_handler(CommandHandler("ban", report_manually))
  213.  
  214.     application.run_polling()
  215.  
  216. if __name__ == '__main__':
  217.     main()
  218.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement