Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socket
- import sys
- import re
- connect_host = "irc.twitch.tv"
- connect_port = 6667
- class Twitch:
- user = ""
- oauth = ""
- channel_reconnect = ""
- s = None
- def twitch_login_status(self, data):
- if not re.match(r'^:(testserver\.local|tmi\.twitch\.tv) NOTICE \* :Login unsuccessful\r\n$', data): return True
- else: return False
- def twitch_connect(self, user, key, channel):
- self.channel_reconnect = channel # Saving the channel name, for later reconnect
- self.user = user
- self.oauth= key
- print("Connecting to twitch.tv")
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(0.6)
- try:
- s.connect((connect_host, connect_port))
- except socket.error:
- print("Failed to connect to twitch")
- sys.exit()
- print("Connected to " + connect_host + "\n")
- print("Sending Details:\n")
- print('USER %s\r\n' % user)
- print('PASS %s\r\n' % key)
- print('NICK %s\r\n' % user)
- s.send('USER %s\r\n' % user)
- s.send('PASS %s\r\n' % key)
- s.send('NICK %s\r\n' % user)
- if not self.twitch_login_status(s.recv(1024)):
- print("Authentication failure.")
- sys.exit()
- else:
- print("Authentication successful!")
- print("Connected to " + connect_host + ":" +str(connect_port))
- self.s = s
- s.send('JOIN #%s\r\n' % channel)
- try:
- s.recv(1024)
- except socket.error:
- print("ERROR: Socket Error")
- def check_has_message(self, data):
- return re.match(r'^:[a-zA-Z0-9_]+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+(\.tmi\.twitch\.tv|\.testserver\.local) PRIVMSG #[a-zA-Z0-9_]+ :.+$', data)
- def parse_message(self, data):
- return {
- 'channel': re.findall(r'^:.+\![a-zA-Z0-9_]+@[a-zA-Z0-9_]+.+ PRIVMSG (.*?) :', data)[0],
- 'username': re.findall(r'^:([a-zA-Z0-9_]+)\!', data)[0],
- 'message': re.findall(r'PRIVMSG #[a-zA-Z0-9_]+ :(.+)', data)[0].decode('utf8')
- }
- def send_message(self, channel, data):
- self.s.send('PRIVMSG #' + channel + ' :/me '+data+'\r\n')
- def twitch_receive_messages(self, amount=1024):
- data = None
- try: data = self.s.recv(1024);
- except: return False;
- if not data:
- print("Lost connection to Twitch, attempting to reconnect...");
- self.twitch_connect(self.user, self.oauth, self.channel_reconnect);
- return None
- #self.ping(data)
- if self.check_has_message(data):
- print(data);
- return [self.parse_message(line) for line in filter(None, data.split('\r\n'))];
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement