Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- A guide to creating a socket based IRC bot in Python. This guide will teach you to create a class that connects to and interacts with IRC data.
- <!--more-->
- <h2><strong>Disclaimer:</strong></h2>
- I do not discourage the use of a 3rd party library such as Twisted for IRC, If you do wish to create one with sockets this guide will help.
-
- <h2><strong>Introduction:</strong></h2>
- IRC is a well-known simple socket based chat system. Lots of people use it day-to-day for numerous reasons. Most languages have socket wrappers / libraries which makes it very easy to create programs that interact with IRC. This guide will be creating a class, You will need to know how to create these before reading this guide. I will be littering the examples with comments so that you can get a good grasp of what is going on (hopefully). I will also be obeying PEP8's indentation and line length standard, Python is all about the readability ;)
- Here is a very useful <a href="https://tools.ietf.org/html/rfc1459#section-4.1" target="_blank">link to the RFC</a> if you want to find out more about IRC.
- <h2></h2>
- <h2>A little about the IRC syntax:</h2>
- Generally all IRC servers today follow a standard syntax, with the exception of a few different commands. The data you get from IRC will be newline (\r\n) separated messages. Each message type will have different data. For example, PRIVMSG will look something like this:
- <pre class="lang:python highlight:0 decode:true">:nickname!~host@ipaddress PRIVMSG #channel :message here\r\n</pre>
- JOIN will look like:
- <pre class="lang:python highlight:0 decode:true">:nickname!~host@ipaddress JOIN #channel\r\n</pre>
- Im sure you can see a pattern forming there, generally the data is made up of things like nickname and host, message type, channel name etc. There are exceptions to this style so don't rely on data always looking like that.
- Things like PING or ERROR have a different syntax. PING will look like <span class="lang:python highlight:0 decode:true crayon-inline">PING :server.name\r\n</span> and ERROR will look something like <span class="lang:python highlight:0 decode:true crayon-inline">ERROR :Closing Link:\r\n</span>
- The data you send to IRC is a little simpler. Ill cover the standard ones. Each one should end with \r\n. You can also chain them together like this. eg:
- <pre class="lang:python highlight:0 decode:true ">PRIVMSG #channel :bye!\r\nPART #channel\r\n</pre>
- <ul>
- <li>PRIVMSG - Sending a channel message - <span class="lang:python highlight:0 decode:true crayon-inline">PRIVMSG #channel :My message here\r\n</span>
- <ul>
- <li>If channel is a nickname it will be sent in PM (Private Message)</li>
- </ul>
- </li>
- <li>JOIN - Join a channel - <span class="lang:python highlight:0 decode:true crayon-inline ">JOIN #channel\r\n</span></li>
- <li>PART - Leave a channel - <span class="lang:python highlight:0 decode:true crayon-inline">PART #channel\r\n</span></li>
- <li>NICK - How you specify your nickname. This should be sent once when you connect to specify what nick you want and again at any time you want to change your nickname - <span class="lang:python highlight:0 decode:true crayon-inline">NICK nickname\r\n</span></li>
- <li>QUIT - Totally leaves IRC - <span class="lang:python highlight:0 decode:true crayon-inline">QUIT :reason why you quit\r\n</span></li>
- </ul>
- There are a lot more messages you can send, the RFC will cover all of them. I also suggest you print the data the bot receives so you can see what is happening, this makes it easy to get familiar with what IRC will send you.
- <h2></h2>
- <h2><strong>Creating the class:</strong></h2>
- Okay as I've said we will be using a class. We will call this class Connection.
-
- <h3><strong>__init__()</strong></h3>
- The __init__ constructor will need to take a few params. Lets cover these.
- <ul>
- <li><strong>server - </strong>The server the bot will connect to. (We will use irc.freenode.net for this guide)</li>
- <li><strong>port - </strong>The port it will connect on. (usually 6667, It might be different on other servers)</li>
- <li><strong>nickname - </strong>The nickname for the bot.</li>
- <li><strong>realname - </strong>This shows up when someone uses /whois nickname. This can be anything you want. More than one word.</li>
- <li><strong>password - </strong>The password for the server. (Defaults to None)</li>
- </ul>
- We only need to set these variables and __init__() is done. You can do anything you want after these vars are set, such as creating log files or whatever you want.
- <pre class="lang:python decode:true">#!/usr/bin/env python3
- import time
- import socket
- import random
- class Connection:
- def __init__(self, server, port, nickname, realname, password=None):
- self.server = server
- self.port = port
- self.nickname = nickname
- self.realname = realname
- self.password = password</pre>
-
- <h3><strong>send()</strong></h3>
- By default the socket will have a .send() method but I wrap around this to make things easier to use. The params are
- <ul>
- <li><strong>data - </strong>The data to be sent.</li>
- <li><strong>end - </strong>The thing to append to the end of data. (defaults to \r\n)</li>
- </ul>
- <pre class="lang:python decode:true"> def send(self, data, end='\r\n'):
- # print('[out] {}'.format(data), end=end)
- # Encode and send the data.
- self.socket.send('{}{}'.format(data, end).encode('utf-8'))
- return None</pre>
- <h3><strong>connect()</strong></h3>
- After we get the variables set we should connect. This method will create the socket instance, we will use the default values for the socket. This method returns True if the connection was a success and False otherwise. After we connect we need to identify the bot by sending NICK / USER messages. params:
- <ul>
- <li><strong>attempts - </strong>The number of times it will try to connect if it fails. (Defaults to 10)</li>
- <li><strong>delay - </strong>The delay between each attempt. This will be multiplied by the attempt number + 1. (Defaults to 10)</li>
- </ul>
- <pre class="lang:python decode:true"> def connect(self, attempts=10, delay=10):
- self.socket = socket.socket()
- for i in range(attempts):
- # Wrap the connect in a try/except so it can try again if it
- # fails.
- try:
- self.socket.connect((self.server, self.port))
- except OSError:
- # If there was an error connecting sleep.
- # This delay will increase each time it fails.
- time.sleep((i+1)*delay)
- else:
- # Identify the bot with the server. PASS / NICK / USER
- # PASS password
- # NICK nickname
- # USER nickname user host :realname
- # For user and host we will just use the nickname.
- # If you have set a password send it first.
- if self.password is not None:
- self.send('PASS {}'.format(password))
- self.send('NICK {}'.format(self.nickname))
- self.send('USER {0} {0} {0} :{1}'.format(self.nickname,
- self.realname))
- return True
- return False</pre>
-
- <h3><strong>receive_loop()</strong></h3>
- This is the loop that does all the heavy lifting. Receiving and parsing the data. It will then call a function according to the data it receives. I explain this system in the code.
- <pre class="scroll:true lang:python decode:true"> def receive_loop(self):
- # Create a variable to append data to.
- data = b''
- # Set the timeout to 300 seconds. If the bot hasn't received data
- # in 300 seconds it is disconnected. This is used as a fallback
- # if the recv == b'': line fails to work.
- self.socket.settimeout(300)
- while True:
- try:
- # Append the received data to the recv variable.
- # I found 1024 works well for the amount of bytes received.
- recv = self.socket.recv(1024)
- # Check if the data is blank, this will happen if the bot
- # has been disconnected.
- if recv == b'':
- raise ConnectionResetError
- # Append the data that was just received to the data variable.
- data += recv
- except (OSError, ConnectionResetError, TimeoutError):
- # The bot has been disconnected for some reason.
- # Wait for 30 seconds then reconnect.
- time.sleep(30)
- self.connect()
- continue
- else:
- # If no error has been encountered loop through each line
- # of the data. If some of it doesn't end with \r\n
- # it will stay in the data variable until more data comes in.
- # Somtimes IRC sends half a line then the next .recv()
- # it will send the rest. This handles that.
- while b'\r\n' in data:
- line, data = data.split(b'\r\n', 1)
- print( '[in] {}'.format(line) )
- # This is where we decode and handle the data.
- # I prefer to split the data by space and handle each
- # segment. You will see why I do this once we start
- # receiving data.
- split = line.decode('utf-8').split(' ')
- # IRC sends certain message 'types' like PRIVMSG JOIN PART
- # etc. All of these apart for PING and ERROR are in the
- # second index of the split list if we split the line
- # by a space. PING and ERROR are always in the first.
- # We check the first index to see if it matches any
- # of the functions this class has. This creates a
- # system where each time the bot sees a certain message
- # type it will call the related function.
- # (I prefix it with r_ since some of the IRC messages
- # are numbers)
- first = getattr(self, 'r_{}'.format(split[0]), None)
- if first is not None:
- # The class either has r_PING or r_ERROR defined.
- # Call it with the rest of the data.
- # A line such as "PING :server" will end up calling
- # r_PING(':server')
- first(*split[1:])
- # Check for the second index. (PRIVMSG JOIN PART etc.)
- second = getattr(self, 'r_{}'.format(split[1]), None)
- if second is not None:
- # We send all but the second index in the params.
- # A line such as
- # :nickname!~host@ipaddress PRIVMSG #channel :message
- # will end up calling
- # r_PRIVMSG(':nickname!~host@ipaddress', '#channel', ':message')
- # Yes the message and nickname are prefixed with :
- # but it is very easy to remove that in the function
- # that gets called.
- second(*[split[0]] + split[2:])
- return None</pre>
-
- <h3><strong>privmsg()</strong></h3>
- A wrapper of .send() that makes sending messages to channels easier.
- <pre class="lang:python decode:true"> def privmsg(self, channel, message):
- # Sends a message to a channel.
- self.send('PRIVMSG {} :{}'.format(channel, message))
- return None</pre>
-
- <h3><strong>r_PING()</strong></h3>
- This function gets called every time the bot finds a PING message. Sending PONG keeps it alive.
- <pre class="lang:python decode:true"> def r_PING(self, server):
- # Sends PONG as a response. This keeps the bot connected.
- self.send('PONG {}'.format(server))
- return None</pre>
-
- <h3><strong>r_433()</strong></h3>
- Gets called when the nickname is currently in use. I just add _ to the end.
- <pre class="lang:python decode:true"> def r_433(self, *_):
- # Append _ to the nickname if the current nickname is in use.
- self.nickname = '{}_'.format(self.nickname)
- self.send('NICK {}'.format(self.nickname))
- return None</pre>
-
- Now that we have this code created we have 2 choices of how we want to continue. You can leave it as that so you have a reusable simple base for any IRC program you want or build the bot into this class. I recommend the first, leaving this class as it currently is.
-
- <h2><strong>Creating another class - Extending classes - The bot class:</strong></h2>
-
- <pre class="lang:python decode:true">class Bot(Connection):
- def __init__(self, server, port, nickname, password=None):
- Connection.__init__(self, server, port, nickname, password)
- # Attempt to connect.
- connected = self.connect()
- # If the bot is connected start the receive loop.
- if connected:
- self.receive_loop()</pre>
-
- <h3><strong>r_001()</strong></h3>
- This is the first message the server sends. Perfect for sending JOIN (joining a channel). Usually I would send JOIN after the end of MOTD message, however, some servers do not send this. All servers send 001.
- <pre class="lang:python decode:true"> def r_001(self, *_):
- # 001 is the perfect time to join a channel. All servers send 001
- # so this will always happen.
- self.send('JOIN #Sjc_Bot')
- return None</pre>
-
- <h3></h3>
- This will get called when the bot receives a PRIVMSG, which is any channel message. params:
- <ul>
- <li><strong>host - </strong>The full nickname and host for the person who just spoke.</li>
- <li><strong>channel - </strong>The channel they spoke in.</li>
- <li><strong>*message - </strong>A tuple of the space separated message.</li>
- </ul>
-
- <pre class="lang:python decode:true"> def r_PRIVMSG(self, host, channel, *message):
- # Get the nickname and host.
- nickname, host = host[1:].split('!')
- # If the channel is the bots nickname change the channel
- # to whoever called it. This enables the bot to respond in PM
- # if the user talks to it in PM (Private Message).
- if channel == self.nickname:
- channel = nickname
- # Join the message by space, then remove the :
- # then split it into command and params
- command, *params = ' '.join(message)[1:].split(' ')
- # Feel free to use any command system here. I will just show you a
- # simple one.
- if command == '!hello':
- # Responds to !hello with Hello nickname
- self.privmsg(channel, 'Hello {}!'.format(nickname))
- if command == '!random':
- # Randomizes between params
- if len(params) == 1:
- try:
- number = int(params[0])
- except ValueError:
- number = 10
- output = random.randint(number)
- if len(params) == 2:
- try:
- start = int(params[0])
- end = int(params[1])
- except ValueError:
- start = 0
- end = 10
- output = random.randrange(start, end)
- self.privmsg(channel, str(output))
- return None</pre>
-
-
- <h2><strong>Bringing it together</strong></h2>
- <pre class="lang:python decode:true">#!/usr/bin/env python3
- import time
- import socket
- import random
- class Connection:
- def __init__(self, server, port, nickname, realname, password=None):
- self.server = server
- self.port = port
- self.nickname = nickname
- self.realname = realname
- self.password = password
- def send(self, data, end='\r\n'):
- # print('[out] {}'.format(data), end=end)
- # Encode and send the data.
- self.socket.send('{}{}'.format(data, end).encode('utf-8'))
- return None
- def connect(self, attempts=10, delay=10):
- self.socket = socket.socket()
- for i in range(attempts):
- # Wrap the connect in a try/except so it can try again if it
- # fails.
- try:
- self.socket.connect((self.server, self.port))
- except OSError:
- # If there was an error connecting sleep.
- # This delay will increase each time it fails.
- time.sleep((i+1)*delay)
- else:
- # Identify the bot with the server. PASS / NICK / USER
- # PASS password
- # NICK nickname
- # USER nickname user host :realname
- # For user and host we will just use the nickname.
- # If you have set a password send it first.
- if self.password is not None:
- self.send('PASS {}'.format(password))
- self.send('NICK {}'.format(self.nickname))
- self.send('USER {0} {0} {0} :{1}'.format(self.nickname,
- self.realname))
- return True
- return False
- def receive_loop(self):
- # Create a variable to append data to.
- data = b''
- # Set the timeout to 300 seconds. If the bot hasn't received data
- # in 300 seconds it is disconnected. This is used as a fallback
- # if the recv == b'': line fails to work.
- self.socket.settimeout(300)
- while True:
- try:
- # Append the received data to the recv variable.
- # I found 1024 works well for the amount of bytes received.
- recv = self.socket.recv(1024)
- # Check if the data is blank, this will happen if the bot
- # has been disconnected.
- if recv == b'':
- raise ConnectionResetError
- # Append the data that was just received to the data variable.
- data += recv
- except (OSError, ConnectionResetError, TimeoutError):
- # The bot has been disconnected for some reason.
- # Wait for 30 seconds then reconnect.
- time.sleep(30)
- self.connect()
- continue
- else:
- # If no error has been encountered loop through each line
- # of the data. If some of it doesn't end with \r\n
- # it will stay in the data variable until more data comes in.
- # Somtimes IRC sends half a line then the next .recv()
- # it will send the rest. This handles that.
- while b'\r\n' in data:
- line, data = data.split(b'\r\n', 1)
- print( '[in] {}'.format(line) )
- # This is where we decode and handle the data.
- # I prefer to split the data by space and handle each
- # segment. You will see why I do this once we start
- # receiving data.
- split = line.decode('utf-8').split(' ')
- # IRC sends certain message 'types' like PRIVMSG JOIN PART
- # etc. All of these apart for PING and ERROR are in the
- # second index of the split list if we split the line
- # by a space. PING and ERROR are always in the first.
- # We check the first index to see if it matches any
- # of the functions this class has. This creates a
- # system where each time the bot sees a certain message
- # type it will call the related function.
- # (I prefix it with r_ since some of the IRC messages
- # are numbers)
- first = getattr(self, 'r_{}'.format(split[0]), None)
- if first is not None:
- # The class either has r_PING or r_ERROR defined.
- # Call it with the rest of the data.
- # A line such as "PING :server" will end up calling
- # r_PING(':server')
- first(*split[1:])
- # Check for the second index. (PRIVMSG JOIN PART etc.)
- second = getattr(self, 'r_{}'.format(split[1]), None)
- if second is not None:
- # We send all but the second index in the params.
- # A line such as
- # :nickname!~host@ipaddress PRIVMSG #channel :message
- # will end up calling
- # r_PRIVMSG(':nickname!~host@ipaddress', '#channel', ':message')
- # Yes the message and nickname are prefixed with :
- # but it is very easy to remove that in the function
- # that gets called.
- second(*[split[0]] + split[2:])
- return None
- def privmsg(self, channel, message):
- # Sends a message to a channel.
- self.send('PRIVMSG {} :{}'.format(channel, message))
- return None
- def r_PING(self, server):
- # Sends PONG as a response. This keeps the bot connected.
- self.send('PONG {}'.format(server))
- return None
- def r_433(self, *_):
- # Append _ to the nickname if the current nickname is in use.
- self.nickname = '{}_'.format(self.nickname)
- self.send('NICK {}'.format(self.nickname))
- return None
- class Bot(Connection):
- def __init__(self, server, port, nickname, password=None):
- Connection.__init__(self, server, port, nickname, password)
- # Attempt to connect.
- connected = self.connect()
- # If the bot is connected start the receive loop.
- if connected:
- self.receive_loop()
- def r_001(self, *_):
- # 001 is the perfect time to join a channel. All servers send 001
- # so this will always happen.
- self.send('JOIN #Sjc_Bot')
- return None
- def r_PRIVMSG(self, host, channel, *message):
- # Get the nickname and host.
- nickname, host = host[1:].split('!')
- # If the channel is the bots nickname change the channel
- # to whoever called it. This enables the bot to respond in PM
- # if the user talks to it in PM (Private Message).
- if channel == self.nickname:
- channel = nickname
- # Join the message by space, then remove the :
- # then split it into command and params
- command, *params = ' '.join(message)[1:].split(' ')
- # Feel free to use any command system here. I will just show you a
- # simple one.
- if command == '!hello':
- # Responds to !hello with Hello nickname
- self.privmsg(channel, 'Hello {}!'.format(nickname))
- if command == '!random':
- # Randomizes between params
- if len(params) == 1:
- try:
- number = int(params[0])
- except ValueError:
- number = 10
- output = random.randint(number)
- if len(params) == 2:
- try:
- start = int(params[0])
- end = int(params[1])
- except ValueError:
- start = 0
- end = 10
- output = random.randrange(start, end)
- self.privmsg(channel, str(output))
- return None
- def main():
- Bot('irc.freenode.net', 6667, 'MyBot123532')
- return None
- if __name__ == '__main__':
- main()
- </pre>
-
- <h3>Threading:</h3>
- The bot that you just made is purely single threaded. This means if any commands take a long time to run it will not receive more data until the command is done. This can be easily fixed with this little gem - A threading decorator.
- <pre class="lang:python decode:true ">import threading
- class asthread(object):
- def __init__(self, daemon=False):
- self.daemon = daemon
- def __call__(self, function):
- def inner(*args, **kwargs):
- thread = threading.Thread(target=function, args=args,
- kwargs=kwargs)
- thread.daemon = self.daemon
- thread.start()
- return None
- return inner</pre>
- Then you can add @asthread() to the top of any function and it will be called in a new thread.
- Example:
- <pre class="lang:python decode:true "> @asthread()
- def r_001(self, *_):
- # 001 is the perfect time to join a channel. All servers send 001
- # so this will always happen.
- self.send('JOIN #Sjc_Bot')
- return None</pre>
- The only downside is the return value will not get passed to where it was called from.
- I hope this was helpful :)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement