Guest User

Untitled

a guest
Aug 23rd, 2012
148
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 16.49 KB | None | 0 0
  1. import unittest
  2. import socket
  3.  
  4. import telnet
  5. from constants import *
  6.  
  7. class MockSocket(object):
  8.     pass
  9.  
  10. class HandlerTestsBase(unittest.TestCase):
  11.  
  12.     def setUp(self):
  13.         self.mocksocket = MockSocket()
  14.         self.handler = telnet.Handler(self.mocksocket)
  15.  
  16.     def simulate_read(self, data_to_read):
  17.         """Simulates reading from the remote end. data_to_read is the data that
  18.        the remote end is sending to our socket."""
  19.         self.mocksocket.recv = lambda x : data_to_read
  20.         self.handler.handle_read()
  21.  
  22.     def simulate_write(self, number_of_bytes):
  23.         """Simulates a write to the remote end of up to number of bytes
  24.        sent."""
  25.         self.mocksocket.send = lambda x : number_of_bytes
  26.         self.handler.handle_write()
  27.  
  28.     def simulate_write_read(self, data_to_read):
  29.         """Simulates writing to the socket and then immediately reading from
  30.        it."""
  31.         self.mocksocket.recv = lambda x : data_to_read
  32.         self.handler.handle_read()
  33.  
  34.         self.mocksocket.send = lambda x : 1024
  35.         self.handler.handle_write()
  36.  
  37. class HandlerTests(HandlerTestsBase):
  38.  
  39.     def test_send(self):
  40.         """Tests that the send function correctly queues data to the
  41.        writebuffer."""
  42.         data = "Some data to send."
  43.         self.handler.send(data)
  44.         self.assertEqual(self.handler.writebuffer, data)
  45.         self.handler.send(data)
  46.         self.assertEqual(self.handler.writebuffer, data + data)
  47.  
  48.     def test_handle_write(self):
  49.         """Tests that when we call handle_write we send data from our
  50.        writebuffer to the socket and keeps what couldn't be sent still in the
  51.        writebuffer."""
  52.         data = "Some data to send."
  53.         self.handler.send(data)
  54.         self.simulate_write(5)
  55.         self.assertEqual(self.handler.writebuffer, data[5:])
  56.  
  57.     def test_handle_read(self):
  58.         """Makes sure that when we call handle_read that what we read appears
  59.        in our readbuffer."""
  60.         data = "Some data to recv."
  61.         self.simulate_read(data)
  62.         self.assertEqual(self.handler.readbuffer, data)
  63.  
  64.         self.simulate_read(data)
  65.         self.assertEqual(self.handler.readbuffer, data + data)
  66.  
  67.     def test_read_IAC_IAC(self):
  68.         """Whenever we read IAC IAC we need to just add a single IAC to our
  69.        readbuffer."""
  70.         data = "Some data " + IAC + IAC + " and some more."
  71.         expected = "Some data " + IAC + " and some more."
  72.         self.simulate_read(data)
  73.         self.assertEqual(self.handler.readbuffer, expected)
  74.  
  75.     def test_read_IAC_command(self):
  76.         """Tests handling an IAC command we don't support."""
  77.         data = "Some data " + IAC + chr(1) + "and some more."
  78.         expected = "Some data and some more."
  79.         self.simulate_read(data)
  80.         self.assertEqual(self.handler.readbuffer, expected)
  81.  
  82.     def test_read_double_IAC_IAC(self):
  83.         """Makes sure that when we get IAC IAC IAC IAC that only two IACs are
  84.        in the readbuffer."""
  85.         data = "a" + IAC + IAC + IAC + IAC + "b"
  86.         expected = "a" + IAC + IAC + "b"
  87.         self.simulate_read(data)
  88.         self.assertEqual(self.handler.readbuffer, expected)
  89.  
  90.     def test_read_broken_IAC_IAC(self):
  91.         """Checks that when we receive IAC and then another IAC in another call
  92.        to read that only one is added to the readbuffer."""
  93.         self.simulate_read("a" + IAC)
  94.         self.simulate_read(IAC + "b")
  95.         self.assertEqual(self.handler.readbuffer, "a" + IAC + "b")
  96.  
  97.     def test_read_IAC_DODONTWILLWONT_option(self):
  98.         """Tests the various DO/WONT/WILL/WONT options: that they don't show
  99.        up in our readbuffer and that they also work when broken over multiple
  100.        read calls."""
  101.  
  102.         for option in (DO, DONT, WILL, WONT):
  103.             self.setUp()
  104.             called = [False]
  105.             def _local_option_enabled(option):
  106.                 called[0] = True
  107.             self.handler.local_option_enabled = _local_option_enabled
  108.  
  109.             self.simulate_read("a" + IAC)
  110.             self.simulate_read(option)
  111.             self.simulate_read(chr(1) + "b")
  112.             self.assertEqual(self.handler.readbuffer, "ab")
  113.             self.simulate_read("c" + IAC + option + chr(1) + "d")
  114.             self.assertEqual(self.handler.readbuffer, "abcd")
  115.             self.assertFalse(called[0])
  116.  
  117.  
  118. class TelnetOptionsTests(HandlerTestsBase):
  119.  
  120.     def setUp(self):
  121.         HandlerTestsBase.setUp(self)
  122.         self.local_option_enabled_called = False
  123.         self.local_option_disabled_called = False
  124.         self.remote_option_enabled_called = False
  125.         self.remote_option_disabled_called = False
  126.         self.handler.local_option_enabled = self.local_option_enabled
  127.         self.handler.local_option_disabled = self.local_option_disabled
  128.         self.handler.remote_option_enabled = self.remote_option_enabled
  129.         self.handler.remote_option_disabled = self.remote_option_disabled
  130.  
  131.     def local_option_enabled(self, option):
  132.         self.local_option_enabled_called = True
  133.  
  134.     def local_option_disabled(self, option):
  135.         self.local_option_disabled_called = True
  136.  
  137.     def remote_option_enabled(self, option):
  138.         self.remote_option_enabled_called = True
  139.  
  140.     def remote_option_disabled(self, option):
  141.         self.remote_option_disabled_called = True
  142.  
  143.     def test_enable_local_option(self):
  144.         """Tests enabling a local option and the return value."""
  145.         option = chr(1)
  146.         #Calling enable_local_option the first time should return
  147.         #NEGOTIATING as status and send an IAC WILL option, without
  148.         #calling local_option_enabled yet
  149.         result = self.handler.enable_local_option(option)
  150.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  151.         self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
  152.         self.assertFalse(self.local_option_enabled_called)
  153.  
  154.         #If we try to call enable_local_option again then we should return
  155.         #ALREADY_NEGOTIATING and we need to make sure we don't accidently
  156.         #send two IAC WILLs or call local_option_enabled
  157.         result = self.handler.enable_local_option(option)
  158.         self.assertEqual(result, telnet.handler.Options.ALREADY_NEGOTIATING)
  159.         self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
  160.         self.assertFalse(self.local_option_enabled_called)
  161.  
  162.         #Option should be in our list of negotations but not yet in our list
  163.         #of enabled options
  164.         self.assertIn(option, self.handler.our_negotiations)
  165.         self.assertNotIn(option, self.handler.our_options)
  166.  
  167.     def test_will_do(self):
  168.         """Tests the following:
  169.            --> IAC WILL option
  170.            <-- IAC DO option
  171.            <-- IAC DO option"""
  172.         option = chr(1)
  173.         self.handler.enable_local_option(option)
  174.         self.simulate_write_read(IAC + DO + option)
  175.         #When the hosts answers with DO we need to make sure we called
  176.         #local_option_enabled.
  177.         self.assertTrue(self.local_option_enabled_called)
  178.  
  179.         #And when we enable the option again we should get ALREADY_ENABLED
  180.         #as status, without us sending anything to the remote end
  181.         result = self.handler.enable_local_option(option)
  182.         self.assertEqual(result, telnet.handler.Options.ALREADY_ENABLED)
  183.         self.assertEqual(self.handler.writebuffer, "")
  184.  
  185.         #Option should not be in our list of negotiations but should appear
  186.         #in our list of enabled options
  187.         self.assertNotIn(option, self.handler.our_negotiations)
  188.         self.assertIn(option, self.handler.our_options)
  189.  
  190.         #When we get another DO we should completely ignore it.
  191.         self.simulate_read(IAC + DO + option)
  192.         self.assertEqual(self.handler.writebuffer, "")
  193.         self.assertNotIn(option, self.handler.our_negotiations)
  194.         self.assertIn(option, self.handler.our_options)
  195.  
  196.     def test_will_do_dont(self):
  197.         """Tests the following:
  198.            --> IAC WILL option
  199.            <-- IAC DO OPTION
  200.            <-- IAC DONT option"""
  201.         option = chr(1)
  202.         self.handler.enable_local_option(option)
  203.         self.simulate_write_read(IAC + DO + option)
  204.         #Make sure we didn't accidently call local_option_disabled
  205.         self.assertFalse(self.local_option_disabled_called)
  206.  
  207.         self.simulate_read(IAC + DONT + option)
  208.         #Now, after we received the DONT, we need to check that we did call it
  209.         self.assertTrue(self.local_option_disabled_called)
  210.         #And make sure that we acknowledge that fact by answering with a WONT
  211.         self.assertEqual(self.handler.writebuffer, IAC + WONT + option)
  212.         #Checks that option isn't in our negotation and enabled option lists
  213.         self.assertNotIn(option, self.handler.our_negotiations)
  214.         self.assertNotIn(option, self.handler.our_options)
  215.  
  216.         #Enabling the option again should start negotiations once more
  217.         result = self.handler.enable_local_option(option)
  218.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  219.         self.assertIn(option, self.handler.our_negotiations)
  220.         self.assertNotIn(option, self.handler.our_options)
  221.  
  222.     def test_will_dont(self):
  223.         """Tests the following:
  224.            --> IAC WILL option
  225.            <-- IAC DONT option"""
  226.         option = chr(1)
  227.         self.handler.enable_local_option(option)
  228.         self.simulate_write_read(IAC + DONT + option)
  229.         #Checks that option isn't in our negotation and enabled option lists
  230.         self.assertNotIn(option, self.handler.our_negotiations)
  231.         self.assertNotIn(option, self.handler.our_options)
  232.  
  233.         #Enabling the option again should start negotiations once more
  234.         result = self.handler.enable_local_option(option)
  235.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  236.         self.assertEqual(self.handler.writebuffer, IAC + WILL + option)
  237.  
  238.         #No state changed so none of the local_option_[en|dis]abled should be
  239.         #called
  240.         self.assertFalse(self.local_option_enabled_called)
  241.         self.assertFalse(self.local_option_disabled_called)
  242.  
  243.     def test_enable_remote_option(self):
  244.         """Tests enabling a remote option and the return value."""
  245.         option = chr(1)
  246.         #Calling enable_remote_option the first time should return
  247.         #NEGOTIATING as status and send an IAC DO option, without
  248.         #calling local_option_enabled yet
  249.         result = self.handler.enable_remote_option(option)
  250.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  251.         self.assertEqual(self.handler.writebuffer, IAC + DO + option)
  252.  
  253.         #If we try to call enable_remote_option again then we should return
  254.         #ALREADY_NEGOTIATING and we need to make sure we don't accidently
  255.         #send two IAC DOes or call remote_option_enabled
  256.         result = self.handler.enable_remote_option(option)
  257.         self.assertEqual(result, telnet.handler.Options.ALREADY_NEGOTIATING)
  258.         self.assertEqual(self.handler.writebuffer, IAC + DO + option)
  259.         self.assertIn(option, self.handler.their_negotiations)
  260.         self.assertNotIn(option, self.handler.their_options)
  261.  
  262.     def test_do_will(self):
  263.         """Tests the following:
  264.            --> IAC DO option
  265.            <-- IAC WILL option
  266.            <-- IAC WILL option"""
  267.         option = chr(1)
  268.         self.handler.enable_remote_option(option)
  269.         self.simulate_write_read(IAC + WILL + option)
  270.         #When the hosts answers with a WILL we need to make sure we called
  271.         #remote_option_enabled
  272.         self.assertTrue(self.remote_option_enabled_called)
  273.  
  274.         #And when we enable the option again we should get ALREADY_ENABLED
  275.         #as status, without us sending anything to the remote end
  276.         result = self.handler.enable_remote_option(option)
  277.         self.assertEqual(result, telnet.handler.Options.ALREADY_ENABLED)
  278.         self.assertEqual(self.handler.writebuffer, "")
  279.  
  280.         #Option should not be in our list of negotiations but should appear
  281.         #in our list of enabled options
  282.         self.assertNotIn(option, self.handler.their_negotiations)
  283.         self.assertIn(option, self.handler.their_options)
  284.  
  285.         #When we get another WILL we should completely ignore it.
  286.         self.simulate_read(IAC + WILL + option)
  287.         self.assertEqual(self.handler.writebuffer, "")
  288.         self.assertNotIn(option, self.handler.their_negotiations)
  289.         self.assertIn(option, self.handler.their_options)
  290.  
  291.     def test_do_will_wont(self):
  292.         """Tests the following:
  293.            --> IAC DO option
  294.            <-- IAC WILL option
  295.            <-- IAC WONT option"""
  296.         option = chr(1)
  297.         self.handler.enable_remote_option(option)
  298.         self.simulate_write_read(IAC + WILL + option)
  299.         #Make sure we didn't accidently call remote_option_disabled
  300.         self.assertFalse(self.remote_option_disabled_called)
  301.  
  302.         self.simulate_read(IAC + WONT + option)
  303.         #Now, after we recived the WONT, we need to check that we did call it
  304.         self.assertTrue(self.remote_option_disabled_called)
  305.         #And make sure we acknowledge that fact by answering with a DONT
  306.         self.assertEqual(self.handler.writebuffer, IAC + DONT + option)
  307.         #Checks that option isn't in our negotiation and enabled option lists
  308.         self.assertNotIn(option, self.handler.their_negotiations)
  309.         self.assertNotIn(option, self.handler.their_options)
  310.  
  311.         #Enabling the option again should start negotations once more
  312.         result = self.handler.enable_remote_option(option)
  313.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  314.         self.assertIn(option, self.handler.their_negotiations)
  315.         self.assertNotIn(option, self.handler.their_options)
  316.  
  317.     def test_do_wont(self):
  318.         """Tests the following:
  319.            --> IAC DO option
  320.            <-- IAC WONT option"""
  321.         option = chr(1)
  322.         self.handler.enable_remote_option(option)
  323.         self.simulate_write_read(IAC + WONT + option)
  324.         #Checks that the option isn't in our negotation and enabled option lists
  325.         self.assertNotIn(option, self.handler.their_negotiations)
  326.         self.assertNotIn(option, self.handler.their_options)
  327.  
  328.         #Enabling the option again should start negotations once more
  329.         result = self.handler.enable_remote_option(option)
  330.         self.assertEqual(result, telnet.handler.Options.NEGOTIATING)
  331.         self.assertEqual(self.handler.writebuffer, IAC + DO + option)
  332.  
  333.         #No state changed so none of the local_option_[en|dis]abled should be
  334.         #called
  335.         self.assertFalse(self.remote_option_enabled_called)
  336.         self.assertFalse(self.remote_option_disabled_called)
  337.  
  338.     def test_wont(self):
  339.         """Tests receiving a WONT without us having done anything."""
  340.         option = chr(1)
  341.         self.simulate_read(IAC + WONT + option)
  342.         #When we receive a WONT for an option the remote end already has
  343.         #disabled then we need to ignore it
  344.         self.assertEqual(self.handler.writebuffer, "")
  345.         self.assertFalse(self.remote_option_enabled_called)
  346.         self.assertFalse(self.remote_option_disabled_called)
  347.  
  348.     def test_dont(self):
  349.         """Tests receiving a DONT without us having done anything."""
  350.         option = chr(1)
  351.         self.simulate_read(IAC + DONT + option)
  352.         #When we receive a DONT for an option our end already has disabled
  353.         #then we need to ignore it
  354.         self.assertEqual(self.handler.writebuffer, "")
  355.         self.assertFalse(self.remote_option_enabled_called)
  356.         self.assertFalse(self.remote_option_disabled_called)
  357.  
  358.     def test_will(self):
  359.         """Tests receiving a WILL without us having done anything."""
  360.         option = chr(1)
  361.         self.simulate_read(IAC + WILL + option)
  362.         #When receving a WILL for an option we haven't enabled on their end,
  363.         #we should answer with a DONT
  364.         self.assertEqual(self.handler.writebuffer, IAC + DONT + option)
  365.         self.assertFalse(self.remote_option_enabled_called)
  366.         self.assertFalse(self.remote_option_disabled_called)
  367.  
  368.     def test_do(self):
  369.         """Tests receiving a DO without us having done anything."""
  370.         option = chr(1)
  371.         self.simulate_read(IAC + DO + option)
  372.         #When receving a WILL for an option we haven't enabled on our end,
  373.         #we should answer with a DONT
  374.         self.assertEqual(self.handler.writebuffer, IAC + WONT + option)
  375.         self.assertFalse(self.remote_option_enabled_called)
  376.         self.assertFalse(self.remote_option_disabled_called)
Advertisement
Add Comment
Please, Sign In to add comment