Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from constants import *
- class Options(object):
- NEGOTIATING = 1
- ALREADY_NEGOTIATING = 2
- ALREADY_ENABLED = 3
- class Handler(object):
- """This class handles the communication in the telnet protocol."""
- def __init__(self, socket):
- """Constructor. Takes a socket object."""
- self.socket = socket
- self.writebuffer = ""
- self.readbuffer = ""
- self.iacseq = ""
- self.our_options = set()
- self.our_negotiations = set()
- self.their_negotiations = set()
- self.their_options = set()
- def send(self, data):
- """Queues data to be sent into our writebuffer. This doesn't actually
- send anything yet through our socket."""
- self.writebuffer += data
- def handle_write(self):
- """Tries to write as much from our writebuffer to the socket. Any
- data that we couldn't write will still be in our writebuffer to be
- sent at a later time."""
- sent = self.socket.send(self.writebuffer)
- self.writebuffer = self.writebuffer[sent:]
- def handle_read(self):
- """Tries to read from our socket, interpreting telnet commands and
- storing output into our readbuffer."""
- data = self.socket.recv(1024)
- for c in data:
- if not self.iacseq and c != IAC:
- #This is just normal output
- self.readbuffer += c
- else:
- #We're either in the middle of a telnet command or we just
- #started one
- self._handle_iac(c)
- def _handle_iac(self, char):
- """Called when parsing a telnet command. Telnet commands always starts
- with an IAC and are two or three bytes: if the command is a DO, DONT,
- WILL or WONT then three, or in every other case two."""
- if not self.iacseq:
- #This is the start of a new IAC command sequence (char will always
- #be IAC here)
- self.iacseq += char
- elif len(self.iacseq) == 1:
- #We already have the first byte of the command, this is the second
- #byte
- if char in (WILL, DONT, DO, WONT):
- #If it's a telnet option negotiation then we need to get one
- #more byte
- self.iacseq += char
- else:
- if char == IAC:
- #We got IAC IAC: we need to add a single IAC into our
- #readbuffer
- self.readbuffer += IAC
- self.iacseq = ""
- elif len(self.iacseq) == 2:
- #We got two bytes of a command, this is an option negotiation
- self._handle_option_negotiation(self.iacseq[1], char)
- self.iacseq = ""
- def _handle_option_negotiation(self, verb, option):
- """Called when we received a full telnet option negotation."""
- if verb == DO:
- self._handle_do(option)
- elif verb == DONT:
- self._handle_dont(option)
- elif verb == WILL:
- self._handle_will(option)
- elif verb == WONT:
- self._handle_wont(option)
- def _handle_do(self, option):
- """Called after we have received an IAC DO <option>"""
- if option in self.our_negotiations:
- self.our_options.add(option)
- self.local_option_enabled(option)
- self.our_negotiations.remove(option)
- elif option not in self.our_options:
- self.send(IAC + WONT + option)
- def _handle_dont(self, option):
- """Called after we have received an IAC DONT <option>"""
- if option in self.our_negotiations:
- self.our_negotiations.remove(option)
- elif option in self.our_options:
- self.our_options.remove(option)
- self.local_option_disabled(option)
- self.send(IAC + WONT + option)
- def _handle_will(self, option):
- """Called after we have received an IAC WILL <option>"""
- if option in self.their_negotiations:
- self.their_options.add(option)
- self.remote_option_enabled(option)
- self.their_negotiations.remove(option)
- elif option not in self.their_options:
- self.send(IAC + DONT + option)
- def _handle_wont(self, option):
- """Called after we have received an IAC WONT <option>"""
- if option in self.their_negotiations:
- self.their_negotiations.remove(option)
- elif option in self.their_options:
- self.their_options.remove(option)
- self.remote_option_disabled(option)
- self.send(IAC + DONT + option)
- def enable_local_option(self, option):
- """Tells the remote end that we want to start using option on our end.
- (IAC WILL option).
- If the remote ends agrees then local_option_enabled(option) will
- be called."""
- if option in self.our_options:
- return Options.ALREADY_ENABLED
- elif option in self.our_negotiations:
- return Options.ALREADY_NEGOTIATING
- else:
- self.send(IAC + WILL + option)
- self.our_negotiations.add(option)
- return Options.NEGOTIATING
- return
- def enable_remote_option(self, option):
- """Tells the remote end that we want them to start using option on their
- end (IAC DO option).
- If the remote ends agrees then remote_option_enabled(option) will
- be called."""
- if option in self.their_options:
- return Options.ALREADY_ENABLED
- elif option in self.their_negotiations:
- return Options.ALREADY_NEGOTIATING
- else:
- self.send(IAC + DO + option)
- self.their_negotiations.add(option)
- return Options.NEGOTIATING
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement