Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from typing import List
- from urllib import parse
- import asyncio
- import discord
- from discord.ext import commands
- from discord.utils import escape_markdown
- from discord.errors import Forbidden, HTTPException
- import oyaml as yaml # ordered yaml
- from collections import OrderedDict
- import os
- from PIL import Image
- import sys
- import io
- import time
- #from urllib.request import requests
- import shutil
- from threading import Thread
- class Bridgebot(commands.Bot):
- """
- The AO2 Discord bridge self.
- """
- def __init__(self, server, target_chanel, hub_id, area_id):
- self.prefix: str = "e!"
- super().__init__(command_prefix=self.prefix)
- self.server = server
- self.pending_messages: List[list] = []
- self.hub_id: int = hub_id
- self.area_id: int = area_id
- self.target_channel: discord.abc.Messageable = target_chanel
- self.was_my_message = False # If the bot was the last than sended a message in the channel
- #self.last_char = ""
- #self.last_emote = ""
- self.last_chars: list[str] = ["", ""]
- self.last_emotes: list[str] = ["", ""]
- self.last_index: int = 0
- self.default_user: dict = {
- "discord_id": -1,
- "avatar_url": self.server.config["bridgebot"]["avatar_url"],
- "char_name": self.server.config["bridgebot"]["character"],
- "emote_name": self.server.config["bridgebot"]["emote"],
- "position": self.server.config["bridgebot"]["pos"],
- "ao_showname": "",
- "footer_url": "",
- }
- self.image_URLs: dict = {}
- # Formatting {"{char_name}/{emote_name}", "Discord_URL"}
- self.opened: bool = True
- #self.discord_users = {407268594360254475: {'avatar_url': 'https://cdn.discordapp.com/avatars/407268594360254475/44cdc1ad5843d00f495d5d53ca8e396e.webp?size=4096', 'char_name': 'Franny', 'emote_name': 'hmph', 'position': '', 'ao_showname': ''}}
- self.discord_users: dict = {}
- self.can_save_data: bool = False
- self.need_to_save: bool = False
- self.need_to_save_url: bool = False
- self.time: float = 0
- self.max_save_time: float = 5.0
- self.start_time_image: float = 0.0
- self.start_time_message: float = 0.0
- self.tickspeed: float = max(0.1, self.server.config["bridgebot"]["tickspeed"])
- self.can_send_message: bool = False
- self.has_pillow: bool = False
- try:
- import PIL
- #self.has_pillow = True
- except ModuleNotFoundError:
- print("module 'Pillow' is not installed... Using not Pillow version of the code.")
- #self.has_pillow = False
- async def init(self, token):
- """Starts the actual bot"""
- print("Trying to start the Discord Bridge bot...")
- try:
- await self.start(token)
- except Exception as e:
- print(e)
- raise
- async def on_close(self):
- """When the server is shutting down"""
- self.save_cached_url_data()
- self.save_user_data()
- self.opened = False #Stop the while loop
- await self.close() #Disconnects the Discord bot from Discord servers.
- def load_user_data(self):
- path: str = "storage/discord_bot"
- arg: str = f"{path}/users_data.yaml"
- if not os.path.isfile(arg):
- #raise ArgumentError(f"File not found: {arg}")
- print ("File not found: {arg}\nLoading default data")
- else:
- with open(arg, "r", encoding="utf-8") as stream:
- self.discord_users = yaml.safe_load(stream)
- def save_user_data(self):
- path = "storage/discord_bot"
- arg = f"{path}/users_data.yaml"
- with open(arg, "w", encoding="utf-8") as stream:
- #yaml.safe_dump(stream)
- # hub = OrderedDict()
- yaml.dump(
- self.discord_users,
- stream,
- default_flow_style=False,
- )
- #user_data_str: str = str(self.discord_users).replace(", ", "\n ")
- #print(f"Saved user data successfully with data: {user_data_str}")
- print(f"Saved user data successfully.")
- def load_cached_url_data(self):
- path: str = "storage/discord_bot"
- arg: str = f"{path}/url_cached_data.yaml"
- if not os.path.isfile(arg):
- #raise ArgumentError(f"File not found: {arg}")
- print ("File not found: {arg}\nLoading default data")
- else:
- with open(arg, "r", encoding="utf-8") as stream:
- self.image_URLs = yaml.safe_load(stream)
- def save_cached_url_data(self):
- path: str = "storage/discord_bot"
- arg: str = f"{path}/url_cached_data.yaml"
- with open(arg, "w", encoding="utf-8") as stream:
- #yaml.safe_dump(stream)
- # hub = OrderedDict()
- yaml.dump(
- self.image_URLs,
- stream,
- default_flow_style=False,
- )
- #user_data_str = str(self.discord_users).replace(", ", "\n ")
- #print(f"Saved user data successfully with data: {user_data_str}")
- print(f"Saved image url data successfully.")
- def get_discord_equivalent(self, showname):
- # values = self.discord_users.values()
- #for x in range(len(self.discord_users)):
- # if showname == values[x].get("ao_showname"):
- # return values[x]
- for user in self.discord_users.values():
- if user.get("ao_showname") == showname:
- return user
- #return self.discord_values.values()
- #return self.discord_users.get(self.discord_users.get("discord_id"))
- # If we didn't find any matching users'
- return None
- def get_discord_user(self, search_id) -> dict:
- #keys = self.discord_users.keys()
- #for x in range(len(keys)):
- # if search_id == list(keys)[x]:
- #return list(self.discord_users.values()[x])
- #print(f"get_discord_user is: {str(self.discord_users.values()[x])}")
- #return self.discord_users.values()[x]
- for user_id in self.discord_users.keys():
- if user_id == search_id:
- #print(f'get_discord_user is: {self.discord_users.get(user_id)}')
- return self.discord_users.get(user_id)
- # If we didn't find any users create a new one
- return self.create_discord_user(search_id)
- def create_discord_user(self, creation_id):
- created_user_data: dict = self.default_user
- #self.discord_users.append(creation_id, created_user_data)
- self.discord_users[creation_id] = created_user_data
- return created_user_data
- def queue_message(self, name, message, charname, anim, showname, args, bg):
- base = None
- avatar_url = None
- anim_url = None
- embed_emotes = False
- #bg = args[]
- pos = args[5]
- if "base_url" in self.server.config["bridgebot"]:
- base = self.server.config["bridgebot"]["base_url"]
- if "embed_emotes" in self.server.config["bridgebot"]:
- embed_emotes = self.server.config["bridgebot"]["embed_emotes"]
- if base != None:
- avatar_url = base + parse.quote("characters/" + charname + "/char_icon.png")
- if embed_emotes:
- anim_url = base + parse.quote(
- "characters/" + charname + "/" + anim + ".png"
- )
- elif self.server.config["bridgebot"]["avatar_url"] != None:
- user_data = self.get_discord_equivalent(showname)
- if user_data != None:
- avatar_url = user_data.get("avatar_url")
- else:
- avatar_url = self.server.config["bridgebot"]["avatar_url"]
- character_path = "characters"
- character_path_cache = "storage/base_data/characters"
- final_path = ""
- #png_path = f'/' + charname + "/" + anim + ".png"
- #gif_path = f'/' + charname + "/(a)" + anim + ".gif"
- #gif_path_alt = f'/' + charname + "/" + anim + ".gif"
- file_paths = [ f'/' + charname + "/" + anim + ".png",
- f'/' + charname + "/(a)" + anim + ".gif",
- f'/' + charname + "/(a)/" + anim + ".gif",
- f'/' + charname + "/(b)" + anim + ".gif",
- f'/' + charname + "/(b)/" + anim + ".gif",
- f'/' + charname + "/" + anim + ".gif",
- f'/' + charname + "/" + anim + ".webp"]
- #character_info_path = f'{charname}/{anim}.webp'
- bg_cache_paths = [
- f'{bg}/{anim}.webp'
- ]
- bg_info_path: str = ""
- for path in bg_cache_paths:
- if self.image_URLs.get(path, None) != None:
- bg_info_path = path
- cached_url = self.image_URLs.get(path, None) # Get the URL from the image_URLs dictionary
- #print(f"Got the bg cached path and is: {final_path}")
- break
- #print(f'Cached path is: {path}')
- file_cache_paths = [
- f'{charname}/{anim}.webp',
- f'{charname}/(a){anim}.webp',
- f'{charname}/(b){anim}.webp',
- f'{charname}/{anim}.png',
- f'{charname}/{anim}.gif',
- f'{charname}/(a){anim}.gif',
- f'{charname}/(b){anim}.gif',
- f'{charname}/(a)/{anim}.gif',
- f'{charname}/(b)/{anim}.gif',
- ]
- #character_info_path = ""
- cached_url = None
- for path in file_cache_paths:
- if self.image_URLs.get(path, None) != None:
- #character_info_path = path
- cached_url = self.image_URLs.get(path, None)
- print(f"Got the cached path and is: {final_path}")
- break
- #print(f'Cached path is: {path}')
- #print("character_info_path: " + character_info_path)
- if cached_url != None:
- anim_url = cached_url
- #print(f'Got the anime_url and is: {anim_url}')
- else:
- webp_cache_path = f'{character_path_cache}/{charname}/{anim}.webp'
- if os.path.isfile(webp_cache_path):
- final_path = webp_cache_path
- #print(f' Got the webp path and is: {final_path}')
- else:
- for path in file_paths:
- #if os.path.isfile(f'{character_path_cache}{path}'):
- # final_path = f'{character_path_cache}{path}{path}'
- if os.path.isfile(f'{character_path}{path}'):
- final_path = f'{character_path}{path}'
- #print(f"Got the normal path and is: {final_path}")
- break
- #print(f'Path is: {path}')
- #if os.path.isfile(png_path):
- # final_path = png_path
- #elif os.path.isfile(gif_path):
- # final_path = gif_path
- #elif os.path.isfile(gif_path_alt):
- # final_path = gif_path_alt
- #self.last_char = charname
- #self.last_emote = anim
- self.last_chars[self.last_index] = charname
- self.last_emotes[self.last_index] = anim
- if self.last_index == 1: self.last_index = 0
- else: self.last_index = 1
- if self.can_send_message == True:
- self.can_send_message = False
- asyncio.ensure_future(self.send_char_message(name, message, avatar_url, anim_url, final_path, args, bg))
- #asyncio.ensure_future(self.send_char_message(name, message, avatar_url, anim_url, final_path))
- #self.loop.run_until_complete(self.send_char_message(name, message, avatar_url, anim_url, final_path))
- #self.send_char_message(name, message, avatar_url, anim_url, final_path)
- else:
- self.pending_messages.append([name, message, avatar_url, anim_url, final_path, args, bg])
- async def on_ready(self):
- print("Discord Bridge Successfully logged in.")
- print("Username -> " + self.user.display_name)
- print("ID -> " + str(self.user.id))
- self.guild = self.guilds[0]
- self.channel = discord.utils.get(
- self.guild.text_channels, name=self.target_channel
- )
- sys.setrecursionlimit(255)
- self.load_user_data()
- self.load_cached_url_data()
- #tickspeed: float = 1.0
- #tickspeed: float = max(0.1, self.server.config["bridgebot"]["tickspeed"])
- await self.wait_until_ready()
- delta: float = 0.0
- last_time: float = 0.0
- start_time: float = 0.0
- while self.opened: # Check if bot still opened in the loop
- start_time = time.time()
- delta = last_time - start_time # More precise time than the fixed tickrate
- if len(self.pending_messages) > 0:
- self.can_send_message = False
- #await self.send_char_message(*self.pending_messages.pop())
- asyncio.ensure_future(self.send_char_message(*self.pending_messages.pop()))
- else:
- self.can_send_message = True
- #print(f'can_send_message: {self.can_send_message}')
- # Save_loop
- if self.time >= self.max_save_time:
- self.time -= self.max_save_time
- self.can_save_data = True
- if self.need_to_save_url == True:
- self.try_to_save_url()
- elif self.need_to_save == True:
- self.try_to_save()
- # End Save_loop
- self.time += delta
- last_time = time.time()
- #await asyncio.sleep(self.tickspeed)
- await asyncio.sleep(self.tickspeed - (last_time - start_time)) # Ensure constant tickspeed
- async def on_message(self, message): # Discord to AO2
- # Screw these loser bots
- if message.author.bot or message.webhook_id != None:
- return
- if message.channel != self.channel:
- return
- self.was_my_message = False
- if not message.content.startswith(self.prefix):
- try:
- max_char: int = int(self.server.config["max_chars_ic"])
- except:
- max_char: int = 256
- if len(message.clean_content) > max_char:
- await self.channel.send(
- "Your message was too long - it was not received by the client. (The limit is 256 characters)"
- )
- return
- text: str = message.clean_content
- if len(message.attachments) > 0:
- text = f'[`Attachment`] {text}'
- self.server.send_discord_chat(
- message.author.display_name,
- escape_markdown(text),
- self.hub_id,
- self.area_id,
- self.get_discord_user(message.author.id)
- )
- else: # simple commands (Temporal) If has the discord_prefix
- text: str = message.content
- text = text.replace(self.prefix, "", 1)
- command: str = text.split(" ")[0]
- #args = text.lstrip(f'{command}').split(" ")
- args = text.split()[1:]
- user_data: int = self.get_discord_user(message.author.id)
- changed_atribute: str = ""
- if command == "avatar_url" or command == "avatar":
- user_data["avatar_url"] = " ".join(args)
- changed_atribute = "avatar"
- elif command == "char_name" or command == "char":
- user_data["char_name"] = " ".join(args)
- changed_atribute = "char"
- elif command == "emote_name" or command == "emote":
- user_data["emote_name"] = " ".join(args)
- changed_atribute = "emote"
- elif command == "position" or command == "pos":
- user_data["position"] = " ".join(args)
- changed_atribute = "pos"
- elif command == "ao_showname" or command == "showname" or command == "name":
- user_data["ao_showname"] = " ".join(args)
- changed_atribute = "showname"
- elif command == "footer_url" or command == "footer":
- user_data["footer_url"] = " ".join()
- changed_atribute = "footer"
- if changed_atribute != "":
- msg = f"{message.author.mention} changed {changed_atribute} to ({' '.join(args)})"
- else:
- msg = f"{message.author.mention} your command doesn't exists, u baka.\nYou can use 'avatar', 'name', 'emote', 'pos', 'showname', 'footer'"
- #await message.channel.send(msg)
- await message.reply(msg)
- await self.try_to_save()
- # await self.process_commands(message)
- async def try_to_save(self):
- if self.can_save_data:
- self.save_user_data()
- self.can_save_data = False
- self.need_to_save = False
- else:
- self.need_to_save = True
- async def try_to_save_url(self):
- if self.can_save_data:
- self.save_cached_url_data()
- self.can_save_data = False
- self.need_to_save_url = False
- else:
- self.need_to_save_url = True
- async def is_url_image(image_url):
- #image_formats = ("image/png", "image/jpeg", "image/jpg", "image/webp")
- #r = requests.head(image_url)
- #if r.headers["content-type"] in image_formats:
- # return True
- #return False#
- return True
- #def send_char_message_threaded(self, *args: List[list]):
- # loop = asyncio.new_event_loop()
- # asyncio.set_event_loop(loop)
- # loop.run_until_complete(self.send_char_message(*args))
- # #loop.run_until_complete(self.test_thread())
- # loop.close()
- async def test_thread(self):
- print("Thread started succesfully!")
- async def send_char_message(self, name, message, avatar=None, image=None, image_alt_path=None, args=None, bg=None): #Discord to AO2
- print("Sended char message")
- self.start_time_message = time.time()
- webhook = None
- embed = None
- try:
- webhooks = await self.channel.webhooks()
- for hook in webhooks:
- if hook.user == self.user or hook.name == "AO2_Bridgebot":
- webhook = hook
- break
- if webhook == None:
- webhook = await self.channel.create_webhook(name="AO2_Bridgebot")
- is_from_cache = image_alt_path.startswith("storage/base_data/")
- send_directly = image_alt_path.endswith(".gif")
- if image != None:
- send_image: bool = (
- self.was_my_message == False
- or self.last_chars[0] != self.last_chars[1]
- or self.last_emotes[0] != self.last_emotes[1]
- )
- if send_image:
- embed = discord.Embed()
- embed.set_image(url=image)
- #current_footer_url: str = self.user_data["footer_url"]
- if args != None:
- emote_name = args[3]
- char_name = args[2]
- current_footer_url: str = ""
- footer_text = f"{char_name}/{emote_name}"
- if current_footer_url != None and current_footer_url != "":
- embed.set_footer(text=footer_text, icon_url=current_footer_url)
- else: embed.set_footer(text=footer_text)
- #print(avatar, image)
- webhook_message = await webhook.send(message, username=name, avatar_url=avatar, embed=embed, wait=True)
- print("--- %s seconds (message [Partial]) ---" % (time.time() - self.start_time_message))
- print(
- f'[DiscordBridge] Sending message from "{name}" to "{self.channel.name}"'
- )
- if image == None:
- img = None
- is_binary_image: bool = False
- image_binary: io.BytesIO = None
- character_info_path: str = ""
- await self.channel.trigger_typing()
- self.start_time_image = time.time()
- final_image_path: str = ""
- if is_from_cache == False and send_directly == False and self.has_pillow:
- #if self.has_pillow: # If have the pillow module installed activated it
- with open(image_alt_path, "rb") as f:
- img = Image.open(io.BytesIO(f.read()))
- # Resize thing
- #wanted_height: int = 256
- #width, height = img.size
- #if height > wanted_height:
- #final_size: tuple = ( int(wanted_height * (width / height)) , wanted_height)
- # img = img.resize(final_size)
- # End resize
- is_binary_image = True
- #character_info_path = final_image_path.split("characters/", 1)[1]
- else: # Send the image directly without converting it
- final_image_path = image_alt_path
- # if is_from_cache: final_image_path = image_alt_path
- sended_file_message = None
- if is_binary_image:
- with io.BytesIO() as image_binary:
- img.save(image_binary, "webp", lossless=False, method=6, quality=100, resample=Image.LANCZOS)
- image_binary.seek(0) #Put it in the start point
- print("--- %s seconds (image) ---" % (time.time() - self.start_time_image))
- sended_file_message = await webhook_message.reply(file=discord.File(fp=image_binary, filename=character_info_path.split("/")[-1]))
- # Ended sending the image
- final_image_path = f"storage/base_data/{image_alt_path.rsplit('.', 1)[0]}.webp"
- if os.path.isdir(os.path.dirname(final_image_path)) == False:
- os.makedirs(os.path.dirname(final_image_path))
- with open(final_image_path, "wb") as f:
- shutil.copyfileobj(image_binary, f)
- img.close() # Erase the image from memory manually (More efficient)
- else:
- sended_file_message = await webhook_message.reply(file = discord.File(final_image_path))
- character_info_path = final_image_path.split("characters/", 1)[1]
- print(f'character_info_path: {character_info_path}')
- attachment_url = sended_file_message.attachments[0].url # Get attachment url of the image
- # Save URL for later use
- if character_info_path != None and character_info_path != "":
- self.image_URLs[character_info_path] = attachment_url
- await self.try_to_save_url() # Async function
- #print(f"Image_URLs: {self.image_URLs}")
- self.was_my_message = True
- print("--- %s seconds (message [Total]) ---" % (time.time() - self.start_time_message))
- except Forbidden:
- print(
- f'[DiscordBridge] Insufficient permissions - couldnt send char message "{name}: {message}" with avatar "{avatar}" to "{self.channel.name}"'
- )
- except HTTPException:
- print(
- f'[DiscordBridge] HTTP Failure - couldnt send char message "{name}: {message}" with avatar "{avatar}" to "{self.channel.name}"'
- )
- except Exception as ex:
- # This is a workaround to a problem - [Errno 104] Connection reset by peer occurs due to too many calls for this func.
- # Simple solution is to increase the tickspeed config so it waits longer between messages sent.
- print(f"[DiscordBridge] Exception - {ex}")
- async def save_binary_image(self, binary_data, path):
- pass
- async def save_image(self, image, path):
- pass
Advertisement
Add Comment
Please, Sign In to add comment