Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from discord import Embed
- from discord.ext import commands
- import logging
- import pathlib
- import re
- from __main__ import path, dbPool
- import emoji
- ##### Exceptions #####
- class PollsBaseException(commands.CommandError):
- """Base exception for all poll exceptions"""
- def __init__(self, guildId: int, channelId: int):
- self.guildId = guildId
- self.channelId = channelId
- class ChannelNotInGuildException(PollsBaseException):
- """Exception that occurs when the channel specified by the caller does not match with the context guild"""
- def __init__(self, guildId: int, channelId: int, matchedChannelId: int):
- super().__init__(guildId, channelId)
- self.matchedChannelId = matchedChannelId
- class NoOptionsException(PollsBaseException):
- """Exception that occurs when the caller does not specify any options"""
- def __init__(self, guildId: int, channelId: int):
- super().__init__(guildId, channelId)
- ##### Classes #####
- class PollOption:
- """Dataclass defining a single pollOption"""
- def __init__(self, option: str, reaction: str, responses: int, responders: list):
- self.option = option
- self.reaction = reaction
- self.responses = responses
- self.responders = responders
- class Poll:
- """Class defining the Poll object"""
- def __init__(self):
- self.guildId: int = None
- self.channelId: int = None
- self.messageId: int = None
- self.messageTitle: str = None
- self.verbose: bool = False
- self.options: list = []
- @classmethod
- async def from_db(cls, bot: commands.Bot, pollId: int):
- """Creates a Poll object from the db using the poll id"""
- # Creates Poll object using classmethod placeholder
- poll = cls()
- async with dbPool.acquire() as conn:
- # Query the polls database
- pollQueryResults = await conn.fetch_row(f'SELECT * FROM polls WHERE id = \'{pollId}\';') # Returns Record object of row
- # Initializes poll using the values from the database & option object
- pollMessage = await bot.get_message(pollQueryResults['message_id'])
- poll.guildId = pollMessage.guild.id
- poll.channelId = pollMessage.channel
- poll.messageId = pollQueryResults['message_id']
- poll.messageTitle = pollQueryResults['title']
- poll.verbose = bool(pollQueryResults['verbose'])
- poll.options = []
- # Query the poll_options database
- pollOptionsQueryResults = await conn.fetch(f'SELECT * FROM poll_options WHERE fk = \'{pollId}\'') # Returns list of Records
- for pollOptionsRecord in pollOptionsQueryResults:
- optionObject = PollOption(pollOptionsRecord['option'], pollOptionsRecord['reaction'], pollOptionsRecord['responses'], [])
- # Query the poll_responders database for each option
- pollRespondersQueryResults = await conn.fetch(f'SELECT * FROM poll_responders WHERE fk = \'{pollOptionsRecord["id"]}\';') # Returns list of Records
- # Add the responder id's to the options object
- for pollRespondersRecord in pollRespondersQueryResults:
- optionObject.responders.append(pollRespondersRecord['responder_id'])
- # Add the options object to the poll object
- poll.options.append(optionObject)
- return poll
- async def update_db(self):
- """Updates the database using the poll's data"""
- async with dbPool.acquire() as conn:
- async with conn.transaction():
- dbGuildId = await conn.fetchval(f'SELECT id FROM guilds WHERE guild_id = \'{self.guildId}\';')
- await conn.execute(f'UPDATE polls SET message_id = \'{self.messageId}\', title = \'{self.messageTitle}\', verbose = \'{int(self.verbose)}\' WHERE fk = \'{dbGuildId}\';')
- for option in self.options:
- await conn.execute(f'UPDATE poll_options SET option = \'{option.option}\', reaction = \'{option.reaction}\', responses = \'{option.responses}\';')
- dbOptionId = await conn.fetchval(f'SELECT id FROM poll_options WHERE fk = SELECT id FROM polls WHERE fk = \'{dbGuildId}\';')
- responderRecords = await conn.fetch(f'SELECT responder_id FROM poll_responders WHERE fk = \'{dbOptionId}\';')
- responderIdList = []
- for responderRecord in responderRecords:
- responderIdList.append(responderRecord['responder_id'])
- for responderId in option.responders:
- if not responderId in responderIdList:
- await conn.execute(f'INSERT INTO poll_responders(fk, responder_id) VALUES (\'{dbOptionId}\',\'{responderId}\');')
- async def to_db(self):
- """Inserts a new poll into the database"""
- async with dbPool.acquire() as conn:
- async with conn.transaction():
- dbGuildId = await conn.fetchval(f'SELECT id FROM guilds WHERE guild_id = \'{self.guildId}\';')
- await conn.execute(f'INSERT INTO polls(fk, message_id, title, verbose) VALUES(\'{dbGuildId}\', \'{self.messageId}\', \'{self.messageTitle}\', \'{int(self.verbose)}\');')
- dbPollId = await conn.fetchval(f'SELECT id FROM polls WHERE message_id = \'{self.messageId}\'')
- for option in self.options:
- await conn.execute(f'INSERT INTO poll_options(fk, option, reaction, responses) VALUES(\'{dbPollId}\', \'{option.option}\', \'{option.reaction}\', \'{option.responses}\');')
- dbOptionId = await conn.fetchval(f'SELECT id FROM poll_options WHERE fk = \'{dbPollId}\' AND option = \'{option.option}\';')
- for responder in option.responders:
- await conn.execute(f'INSERT INTO pol_responders(fk, responder_id) VALUES(\'{dbOptionId}\', \'{responder}\');')
- @classmethod
- def from_input(cls, bot: commands.Bot, ctx: commands.Context, input: str): ##################################### <- gets called by pollCommand
- """Creates a Poll object from user input and invocation context"""
- # Creates object from classmethod placeholder
- poll = cls()
- print(input) # debugging
- # Verbosity
- verboseMatch = re.search(r'verbose=(true)', input)
- if verboseMatch != None:
- if verboseMatch.group(1) == 'true':
- poll.verbose = True
- # Guild id
- poll.guildId = ctx.guild.id
- # Channel id
- poll.channelId = ctx.channel.id
- chIdMatch = re.search(r'channelid=([0123456789]*)', input)
- if chIdMatch != None:
- matchedId = int(chIdMatch.group(1))
- matchedGuild = bot.get_channel(matchedId).guild
- if ctx.guild == matchedGuild:
- poll.guildId = ctx.guild.id
- poll.channelId = matchedId
- else:
- raise ChannelNotInGuildException(ctx.guild.id, ctx.channel.id, matchedId)
- # Reactions & options
- optMatch = re.search(r'options=\[(.*)\]', input)
- if optMatch != None:
- if optMatch.group(1) != '':
- optMatchList = re.findall(r'([^;]*?)\|([^;]*)', optMatch.group(1))
- for option, reaction in optMatchList:
- print(emoji.demojize(reaction))
- poll.options.append(PollOption(option, emoji.demojize(reaction), 0, []))
- else:
- raise NoOptionsException(ctx.guild.id, ctx.channel.id)
- else:
- raise NoOptionsException(ctx.guild.id, ctx.channel.id)
- # Message title
- poll.messageTitle = 'No title specified'
- titleMatch = re.search(r'title=\'(.*)\'', input)
- if titleMatch != None:
- if titleMatch.group(1) != '':
- poll.messageTitle = titleMatch.group(1)
- return poll
- class PollsCog(commands.Cog):
- """The Cog object that gets loaded by the bot"""
- def __init__(self, bot):
- self.bot = bot
- async def constructEmbed(self, bot: commands.Bot, poll: Poll):
- embed = Embed(title=poll.messageTitle)
- for option in poll.options:
- responseStr = '-----'
- if option.responses != 0:
- responseStr = ''
- if poll.verbose:
- for responderId in option.responders:
- responseStr += f'{await bot.get_user(responderId).display_name}\n'
- else:
- responseStr = str(poll.option.responses)
- embed.add_field(name=option.option, value=responseStr, inline=False)
- return embed
- @commands.command(name='poll')
- async def pollCommand(self, ctx: commands.Context, *, args: str):
- print(dir(ctx), args)
- poll = Poll.from_input(self.bot, ctx, args) ##################################### <- here
- pollEmbed = await self.constructEmbed(self.bot, poll)
- msgChannel = self.bot.get_channel(poll.channelId)
- message = await msgChannel.send(embed=pollEmbed)
- poll.messageId = message.id
- # Add corresponding reactions to the message
- for option in poll.options:
- await message.add_reaction(option.reaction)
- # Commit the created poll to the database
- await poll.to_db()
- @commands.Cog.listener()
- async def on_raw_reaction_add(self, reaction):
- """Called whenever a reaction is added to a message"""
- # Match the message being reacted to to the database
- async with dbPool.acquire() as conn:
- dbPollId = await conn.fetchval(f'SELECT id FROM polls WHERE message_id = \'{reaction.message.id}\';')
- if not dbPollId is None:
- # Create poll from message id
- poll = await Poll.from_db(self.bot, dbPollId)
- for option in poll.options:
- if option.reaction == reaction.emoji:
- option.responses += 1
- option.responders.append(reaction.user_id)
- await poll.update_db()
- @commands.Cog.listener()
- async def on_raw_reaction_remove(self, reaction, user):
- """Called whenever a reaction is removed from a message"""
- # Match the message the reaction was removed from to the database
- async with dbPool.acquire() as conn:
- dbPollId = await conn.fetchval(f'SELECT id FROM polls WHERE message_id = \'{reaction.message.id}\';')
- if not dbPollId is None:
- # Create poll from message id
- poll = await Poll.from_db(self.bot, dbPollId)
- for option in poll.options:
- if option.reaction == reaction.emoji:
- option.responses -= 1
- option.responders.remove(reaction.user_id)
- await poll.update_db()
- @pollCommand.error
- async def pollCommandErrorhandler (self, ctx, error):
- if isinstance(error, commands.MissingRequiredArgument):
- logging.info(f'{error.__class__.__name__}: {error}')
- await ctx.send(f'```ERROR: At least one argument needs to be specified for this command```')
- else:
- logging.error(error)
- raise error
- def setup(bot):
- bot.add_cog(PollsCog(bot))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement