Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import unittest
- import socket
- import telnet
- from constants import *
- class MockSocket(object):
- pass
- class HandlerTestsBase(unittest.TestCase):
- def setUp(self):
- self.mocksocket = MockSocket()
- self.handler = telnet.Handler(self.mocksocket)
- def simulate_read(self, data_to_read):
- """Simulates reading from the remote end. data_to_read is the data that
- the remote end is sending to our socket."""
- self.mocksocket.recv = lambda x : data_to_read
- self.handler.handle_read()
- def simulate_write(self, number_of_bytes):
- """Simulates a write to the remote end of up to number of bytes
- sent."""
- self.mocksocket.send = lambda x : number_of_bytes
- self.handler.handle_write()
- def simulate_write_read(self, data_to_read):
- """Simulates writing to the socket and then immediately reading from
- it."""
- self.mocksocket.recv = lambda x : data_to_read
- self.handler.handle_read()
- self.mocksocket.send = lambda x : 1024
- self.handler.handle_write()
- class HandlerTests(HandlerTestsBase):
- def test_send(self):
- """Tests that the send function correctly queues data to the
- writebuffer."""
- data = "Some data to send."
- self.handler.send(data)
- self.assertEqual(self.handler.writebuffer, data)
- self.handler.send(data)
- self.assertEqual(self.handler.writebuffer, data + data)
- def test_handle_write(self):
- """Tests that when we call handle_write we send data from our
- writebuffer to the socket and keeps what couldn't be sent still in the
- writebuffer."""
- data = "Some data to send."
- self.handler.send(data)
- self.simulate_write(5)
- self.assertEqual(self.handler.writebuffer, data[5:])
- def test_handle_read(self):
- """Makes sure that when we call handle_read that what we read appears
- in our readbuffer."""
- data = "Some data to recv."
- self.simulate_read(data)
- self.assertEqual(self.handler.readbuffer, data)
- self.simulate_read(data)
- self.assertEqual(self.handler.readbuffer, data + data)
- def test_read_IAC_IAC(self):
- """Whenever we read IAC IAC we need to just add a single IAC to our
- readbuffer."""
- data = "Some data " + IAC + IAC + " and some more."
- expected = "Some data " + IAC + " and some more."
- self.simulate_read(data)
- self.assertEqual(self.handler.readbuffer, expected)
- def test_read_IAC_command(self):
- """Tests handling an IAC command we don't support."""
- data = "Some data " + IAC + chr(1) + "and some more."
- expected = "Some data and some more."
- self.simulate_read(data)
- self.assertEqual(self.handler.readbuffer, expected)
- def test_read_double_IAC_IAC(self):
- """Makes sure that when we get IAC IAC IAC IAC that only two IACs are
- in the readbuffer."""
- data = "a" + IAC + IAC + IAC + IAC + "b"
- expected = "a" + IAC + IAC + "b"
- self.simulate_read(data)
- self.assertEqual(self.handler.readbuffer, expected)
- def test_read_broken_IAC_IAC(self):
- """Checks that when we receive IAC and then another IAC in another call
- to read that only one is added to the readbuffer."""
- self.simulate_read("a" + IAC)
- self.simulate_read(IAC + "b")
- self.assertEqual(self.handler.readbuffer, "a" + IAC + "b")
- def test_read_IAC_DODONTWILLWONT_option(self):
- """Tests the various DO/WONT/WILL/WONT options: that they don't show
- up in our readbuffer and that they also work when broken over multiple
- read calls."""
- for option in (DO, DONT, WILL, WONT):
- self.setUp()
- called = [False]
- def _local_option_enabled(option):
- called[0] = True
- self.handler.local_option_enabled = _local_option_enabled
- self.simulate_read("a" + IAC)
- self.simulate_read(option)
- self.simulate_read(chr(1) + "b")
- self.assertEqual(self.handler.readbuffer, "ab")
- self.simulate_read("c" + IAC + option + chr(1) + "d")
- self.assertEqual(self.handler.readbuffer, "abcd")
- self.assertFalse(called[0])
- class TelnetOptionsTests(HandlerTestsBase):
- def setUp(self):
- HandlerTestsBase.setUp(self)
- self.local_option_enabled_called = False
- self.local_option_disabled_called = False
- self.remote_option_enabled_called = False
- self.remote_option_disabled_called = False
- self.handler.local_option_enabled = self.local_option_enabled
- self.handler.local_option_disabled = self.local_option_disabled
- self.handler.remote_option_enabled = self.remote_option_enabled
- self.handler.remote_option_disabled = self.remote_option_disabled
- def local_option_enabled(self, option):
- self.local_option_enabled_called = True
- def local_option_disabled(self, option):
- self.local_option_disabled_called = True
- def remote_option_enabled(self, option):
- self.remote_option_enabled_called = True
- def remote_option_disabled(self, option):
- self.remote_option_disabled_called = True
- def test_enable_local_option(self):
- """Tests enabling a local option and the return value."""
- option = chr(1)
- #Calling enable_local_option the first time should return
- #NEGOTIATING as status and send an IAC WILL option, without
- #calling local_option_enabled yet
- result = self.handler.enable_local_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
- self.assertFalse(self.local_option_enabled_called)
- #If we try to call enable_local_option again then we should return
- #ALREADY_NEGOTIATING and we need to make sure we don't accidently
- #send two IAC WILLs or call local_option_enabled
- result = self.handler.enable_local_option(option)
- self.assertEqual(result, telnet.handler.Options.ALREADY_NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
- self.assertFalse(self.local_option_enabled_called)
- #Option should be in our list of negotations but not yet in our list
- #of enabled options
- self.assertIn(option, self.handler.our_negotiations)
- self.assertNotIn(option, self.handler.our_options)
- def test_will_do(self):
- """Tests the following:
- --> IAC WILL option
- <-- IAC DO option
- <-- IAC DO option"""
- option = chr(1)
- self.handler.enable_local_option(option)
- self.simulate_write_read(IAC + DO + option)
- #When the hosts answers with DO we need to make sure we called
- #local_option_enabled.
- self.assertTrue(self.local_option_enabled_called)
- #And when we enable the option again we should get ALREADY_ENABLED
- #as status, without us sending anything to the remote end
- result = self.handler.enable_local_option(option)
- self.assertEqual(result, telnet.handler.Options.ALREADY_ENABLED)
- self.assertEqual(self.handler.writebuffer, "")
- #Option should not be in our list of negotiations but should appear
- #in our list of enabled options
- self.assertNotIn(option, self.handler.our_negotiations)
- self.assertIn(option, self.handler.our_options)
- #When we get another DO we should completely ignore it.
- self.simulate_read(IAC + DO + option)
- self.assertEqual(self.handler.writebuffer, "")
- self.assertNotIn(option, self.handler.our_negotiations)
- self.assertIn(option, self.handler.our_options)
- def test_will_do_dont(self):
- """Tests the following:
- --> IAC WILL option
- <-- IAC DO OPTION
- <-- IAC DONT option"""
- option = chr(1)
- self.handler.enable_local_option(option)
- self.simulate_write_read(IAC + DO + option)
- #Make sure we didn't accidently call local_option_disabled
- self.assertFalse(self.local_option_disabled_called)
- self.simulate_read(IAC + DONT + option)
- #Now, after we received the DONT, we need to check that we did call it
- self.assertTrue(self.local_option_disabled_called)
- #And make sure that we acknowledge that fact by answering with a WONT
- self.assertEqual(self.handler.writebuffer, IAC + WONT + option)
- #Checks that option isn't in our negotation and enabled option lists
- self.assertNotIn(option, self.handler.our_negotiations)
- self.assertNotIn(option, self.handler.our_options)
- #Enabling the option again should start negotiations once more
- result = self.handler.enable_local_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertIn(option, self.handler.our_negotiations)
- self.assertNotIn(option, self.handler.our_options)
- def test_will_dont(self):
- """Tests the following:
- --> IAC WILL option
- <-- IAC DONT option"""
- option = chr(1)
- self.handler.enable_local_option(option)
- self.simulate_write_read(IAC + DONT + option)
- #Checks that option isn't in our negotation and enabled option lists
- self.assertNotIn(option, self.handler.our_negotiations)
- self.assertNotIn(option, self.handler.our_options)
- #Enabling the option again should start negotiations once more
- result = self.handler.enable_local_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
- #No state changed so none of the local_option_[en|dis]abled should be
- #called
- self.assertFalse(self.local_option_enabled_called)
- self.assertFalse(self.local_option_disabled_called)
- def test_enable_remote_option(self):
- """Tests enabling a remote option and the return value."""
- option = chr(1)
- #Calling enable_remote_option the first time should return
- #NEGOTIATING as status and send an IAC DO option, without
- #calling local_option_enabled yet
- result = self.handler.enable_remote_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + DO + option)
- #If we try to call enable_remote_option again then we should return
- #ALREADY_NEGOTIATING and we need to make sure we don't accidently
- #send two IAC DOes or call remote_option_enabled
- result = self.handler.enable_remote_option(option)
- self.assertEqual(result, telnet.handler.Options.ALREADY_NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + DO + option)
- self.assertIn(option, self.handler.their_negotiations)
- self.assertNotIn(option, self.handler.their_options)
- def test_do_will(self):
- """Tests the following:
- --> IAC DO option
- <-- IAC WILL option
- <-- IAC WILL option"""
- option = chr(1)
- self.handler.enable_remote_option(option)
- self.simulate_write_read(IAC + WILL + option)
- #When the hosts answers with a WILL we need to make sure we called
- #remote_option_enabled
- self.assertTrue(self.remote_option_enabled_called)
- #And when we enable the option again we should get ALREADY_ENABLED
- #as status, without us sending anything to the remote end
- result = self.handler.enable_remote_option(option)
- self.assertEqual(result, telnet.handler.Options.ALREADY_ENABLED)
- self.assertEqual(self.handler.writebuffer, "")
- #Option should not be in our list of negotiations but should appear
- #in our list of enabled options
- self.assertNotIn(option, self.handler.their_negotiations)
- self.assertIn(option, self.handler.their_options)
- #When we get another WILL we should completely ignore it.
- self.simulate_read(IAC + WILL + option)
- self.assertEqual(self.handler.writebuffer, "")
- self.assertNotIn(option, self.handler.their_negotiations)
- self.assertIn(option, self.handler.their_options)
- def test_do_will_wont(self):
- """Tests the following:
- --> IAC DO option
- <-- IAC WILL option
- <-- IAC WONT option"""
- option = chr(1)
- self.handler.enable_remote_option(option)
- self.simulate_write_read(IAC + WILL + option)
- #Make sure we didn't accidently call remote_option_disabled
- self.assertFalse(self.remote_option_disabled_called)
- self.simulate_read(IAC + WONT + option)
- #Now, after we recived the WONT, we need to check that we did call it
- self.assertTrue(self.remote_option_disabled_called)
- #And make sure we acknowledge that fact by answering with a DONT
- self.assertEqual(self.handler.writebuffer, IAC + DONT + option)
- #Checks that option isn't in our negotiation and enabled option lists
- self.assertNotIn(option, self.handler.their_negotiations)
- self.assertNotIn(option, self.handler.their_options)
- #Enabling the option again should start negotations once more
- result = self.handler.enable_remote_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertIn(option, self.handler.their_negotiations)
- self.assertNotIn(option, self.handler.their_options)
- def test_do_wont(self):
- """Tests the following:
- --> IAC DO option
- <-- IAC WONT option"""
- option = chr(1)
- self.handler.enable_remote_option(option)
- self.simulate_write_read(IAC + WONT + option)
- #Checks that the option isn't in our negotation and enabled option lists
- self.assertNotIn(option, self.handler.their_negotiations)
- self.assertNotIn(option, self.handler.their_options)
- #Enabling the option again should start negotations once more
- result = self.handler.enable_remote_option(option)
- self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
- self.assertEqual(self.handler.writebuffer, IAC + DO + option)
- #No state changed so none of the local_option_[en|dis]abled should be
- #called
- self.assertFalse(self.remote_option_enabled_called)
- self.assertFalse(self.remote_option_disabled_called)
- def test_wont(self):
- """Tests receiving a WONT without us having done anything."""
- option = chr(1)
- self.simulate_read(IAC + WONT + option)
- #When we receive a WONT for an option the remote end already has
- #disabled then we need to ignore it
- self.assertEqual(self.handler.writebuffer, "")
- self.assertFalse(self.remote_option_enabled_called)
- self.assertFalse(self.remote_option_disabled_called)
- def test_dont(self):
- """Tests receiving a DONT without us having done anything."""
- option = chr(1)
- self.simulate_read(IAC + DONT + option)
- #When we receive a DONT for an option our end already has disabled
- #then we need to ignore it
- self.assertEqual(self.handler.writebuffer, "")
- self.assertFalse(self.remote_option_enabled_called)
- self.assertFalse(self.remote_option_disabled_called)
- def test_will(self):
- """Tests receiving a WILL without us having done anything."""
- option = chr(1)
- self.simulate_read(IAC + WILL + option)
- #When receving a WILL for an option we haven't enabled on their end,
- #we should answer with a DONT
- self.assertEqual(self.handler.writebuffer, IAC + DONT + option)
- self.assertFalse(self.remote_option_enabled_called)
- self.assertFalse(self.remote_option_disabled_called)
- def test_do(self):
- """Tests receiving a DO without us having done anything."""
- option = chr(1)
- self.simulate_read(IAC + DO + option)
- #When receving a WILL for an option we haven't enabled on our end,
- #we should answer with a DONT
- self.assertEqual(self.handler.writebuffer, IAC + WONT + option)
- self.assertFalse(self.remote_option_enabled_called)
- self.assertFalse(self.remote_option_disabled_called)
Advertisement
Add Comment
Please, Sign In to add comment