Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import re
- import socket
- '''
- IRC Client: Mikey Mike
- Intended to be used for a Twitch.tv IRC bot.
- '''
- class IRC(object):
- def __init__(self, parent, username, password, channel, server='irc.twitch.tv', port=6667):
- """
- :param parent: Parent object (used for debugging)
- :param username: Twitch.tv username
- :param password: Twitch.tv oauth password
- :param channel: IRC channel the bot should join
- :param server: IRC server
- :param port: IRC port
- """
- # Parent object.
- self.parent = parent
- # Twitch IRC login credentials
- self.username = username
- self.password = password
- # IRC connection details
- self.server = server
- self.port = port
- self.channel = channel
- # Socket for IRC client
- self.sock = None
- self.socket_buffer = 2048
- self.connected = False
- self.timeout = 5
- def get_socket(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(self.timeout)
- self.sock = sock
- self.parent.log("** Connecting to IRC Server: %s" % self.server)
- try:
- sock.connect((self.server, self.port))
- except:
- self.parent.log('** Failed to Connect to Server: %s, Port: %s.' % (self.server, self.port))
- return None
- sock.settimeout(self.timeout)
- data = 'USER %s\r\n' % self.username
- sock.send(data.encode())
- data = 'PASS %s\r\n' % self.password
- sock.send(data.encode())
- data = 'NICK %s\r\n' % self.username
- sock.send(data.encode())
- if self.check_login_status(sock.recv(1024).decode()):
- self.parent.log('** Successfully Logged into Server: %s' % self.server)
- else:
- self.parent.log('** Unsuccessful Login Attempt on Server: %s' % self.server)
- return None
- self.join_channel(self.channel)
- self.connected = True
- data = 'CAP REQ :twitch.tv/commands\r\n'
- self.sock.send(data.encode())
- return sock
- @staticmethod
- def check_login_status(socket_data):
- if re.match(r'^:(testserver\.local|tmi\.twitch\.tv) NOTICE \* :Login unsuccessful\r\n$', socket_data):
- return False
- else:
- return True
- def join_channel(self, channel):
- self.parent.log("** Joining Channel: %s" % channel)
- try:
- data = 'JOIN %s\r\n' % channel
- self.sock.send(data.encode())
- except Exception:
- self.parent.log('** Failed to Join Channel: %s' % channel)
- self.connected = False
- self.parent.log("** Successfully Joined Channel: %s" % channel)
- def send_chat_message(self, channel, message):
- try:
- data = 'PRIVMSG %s :%s\r\n' % (channel, message)
- self.sock.send(data.encode())
- except Exception:
- self.parent.log('** Error Sending Message to Channel: %s' % channel)
- self.connected = False
- def ping(self):
- data = 'PING \r\n'
- self.sock.send(data.encode())
- def pong(self, data):
- if data[:4] == "PING":
- data = 'PONG \r\n'
- self.sock.send(data.encode())
- @staticmethod
- def incoming_chat(data):
- try:
- data = data.decode()
- except Exception:
- data = data
- if re.match(r'^:[a-zA-Z0-9_]+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+(\.tmi\.twitch\.tv|\.testserver\.local) PRIVMSG #[a-zA-Z0-9_]+ :.+$', data):
- return True
- else:
- return False
- @staticmethod
- def incoming_whisper(data):
- try:
- data = data.decode()
- except Exception:
- data = data
- if re.match(r'^:[a-zA-Z0-9_]+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+(\.tmi\.twitch\.tv|\.testserver\.local) WHISPER [a-zA-Z0-9_]+ :.+$', data):
- return True
- else:
- return False
- @staticmethod
- def parse_chat_message(data):
- return {
- 'channel': re.findall(r'^:.+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+.+ PRIVMSG (.*?) :', data.decode())[0],
- 'username': re.findall(r'^:([a-zA-Z0-9_]+)\!', data.decode())[0],
- 'message': re.findall(r'PRIVMSG #[a-zA-Z0-9_]+ :(.+)', data.decode())[0]
- }
- @staticmethod
- def parse_whisper_message(data):
- return {
- 'channel': re.findall(r'^:.+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+.+ WHISPER (.*?) :', data.decode())[0],
- 'username': re.findall(r'^:([a-zA-Z0-9_]+)\!', data.decode())[0],
- 'message': re.findall(r'WHISPER [a-zA-Z0-9_]+ :(.+)', data.decode())[0]
- }
- class MessageParser(object):
- @staticmethod
- def is_bot_command(message):
- """
- :param message: Entire chat message
- :return: True if the chat message is a bot command
- """
- if message is None:
- return False
- try:
- if message[0] == "!":
- return True
- else:
- return False
- except Exception:
- return False
- @staticmethod
- def get_bot_command(message):
- """
- :param message: Entire chat message
- :return: A string representing a valid bot command, otherwise None
- """
- if message is None:
- return None
- if '!' not in message:
- return None
- try:
- bot_command = message[message.index('!') + 1:].lower()
- if ' ' in bot_command:
- bot_command = bot_command[:bot_command.index(' ')]
- if '\n' in bot_command:
- bot_command = bot_command[:bot_command.index('\n')]
- except Exception:
- bot_command = None
- return bot_command
Add Comment
Please, Sign In to add comment