gur111

thread_pool_server_tests.py

Jun 14th, 2021 (edited)
2,889
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import math
  2. import os
  3. import time
  4. import unittest
  5. import subprocess as sp
  6. import asyncio
  7. import requests_async as requests
  8. from h11 import RemoteProtocolError
  9. from threading import Lock
  10.  
  11. SERVER_PATH = os.path.realpath(os.path.join(os.path.curdir, '..', 'server'))
  12. DYNAMIC_REQ_TIME = 0.2
  13. DEFAULT_PORT = 8080
  14. DEFAULT_THREAD_COUNT = 3
  15. DEFAULT_QUEUE_SIZE = 7
  16. DEFAULT_STATIC_PAGE = 'home.html'
  17. DEFAULT_DYNAMIC_PAGE = 'output.cgi'
  18. SERVER_ADDRESS = 'localhost'  # 'localhost'
  19.  
  20.  
  21. class RequestResult:
  22.     def __init__(self, req_ind, res=None, e=None):
  23.         self.res = res
  24.         self.e = e
  25.         self.req_ind = req_ind
  26.  
  27.     def has_exception(self):
  28.         return self.e is not None
  29.  
  30.     def is_exception_of_type(self, exc_type):
  31.         return isinstance(self.e, exc_type)
  32.  
  33.  
  34. class RequestsTest(unittest.TestCase):
  35.     # noinspection HttpUrlsUsage
  36.     def __init__(self, *args, queue_size=DEFAULT_QUEUE_SIZE, thread_count=DEFAULT_THREAD_COUNT, policy='dt', **kwargs):
  37.         super().__init__(*args, **kwargs)
  38.         self.req_ind_mutex = Lock()
  39.         self.dyn_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/{DEFAULT_DYNAMIC_PAGE}'
  40.         self.static_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/{DEFAULT_STATIC_PAGE}'
  41.         self.not_found_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/not_found'
  42.         self.forbidden_url = f'http://{SERVER_ADDRESS}:{DEFAULT_PORT}/forbidden.cgi'
  43.         self.queue_size = queue_size
  44.         self.max_reqs = self.queue_size
  45.         self.server_path = SERVER_PATH
  46.         self.thread_count = thread_count
  47.         self.policy = policy
  48.         if policy == 'random':
  49.             self.per_drop_size = math.ceil(0.25 * (self.queue_size-self.thread_count))
  50.         elif policy in ['dt', 'dh']:
  51.             self.per_drop_size = 1
  52.         elif policy == 'block':
  53.             self.per_drop_size = 0
  54.  
  55.         self.last_req_index = 0
  56.  
  57.     def setUp(self):
  58.         print('Setting up test:')
  59.         print(f'\tthread: {self.thread_count}')
  60.         print(f'\tqueue size: {self.queue_size}')
  61.         print(f'\tpolicy: {self.policy}')
  62.         os.chdir(os.path.dirname(self.server_path))
  63.         self.server = sp.Popen([self.server_path, f'{DEFAULT_PORT}', f'{self.thread_count}', f'{self.queue_size}', self.policy])
  64.         print(f'Server status is: {self.server.poll()}')
  65.         # input('Confirm open port and hit RETURN')
  66.         # print('')
  67.  
  68.     def tearDown(self) -> None:
  69.         self.server.terminate()
  70.  
  71.     async def make_req(self, url, method='get'):
  72.         try:
  73.             self.req_ind_mutex.acquire()
  74.             req_ind = self.last_req_index + 1
  75.             self.last_req_index += 1
  76.             self.req_ind_mutex.release()
  77.             req_headers = {'req_index': f'{req_ind}'}
  78.             # arrival_time = time.time() * 1000  # in milliseconds
  79.             req_ind = self.last_req_index
  80.             if method == 'get':
  81.                 req = requests.get(url, headers=req_headers)
  82.             elif method == 'post':
  83.                 req = requests.post(url, headers=req_headers)
  84.             elif method == 'delete':
  85.                 req = requests.delete(url, headers=req_headers)
  86.             else:
  87.                 self.fail('Unknown request method')
  88.  
  89.             # print(f'Awaiting request {req_ind}')
  90.             response = await req
  91.             headers = response.headers
  92.             for header in headers:
  93.                 if header.lower().startswith('stat'):
  94.                     headers[header] = headers[header][1:]
  95.  
  96.             # response_time = time.time() * 1000
  97.             # self.assertAlmostEqual(arrival_time, float(response.headers['stat-req-arrival']), delta=min(500 * DYNAMIC_REQ_TIME, (response_time - arrival_time) * 0.2))
  98.         except Exception as e:
  99.             r = RequestResult(req_ind=req_ind, e=e)
  100.             return r
  101.         else:
  102.             r = RequestResult(req_ind=req_ind, res=response)
  103.             return r
  104.  
  105.     async def make_requests(self, url, total_reqs):
  106.         print(f'Requesting url: {self.dyn_url}')
  107.         self.last_req_index = 0
  108.         tasks = []
  109.         fail_expected_tasks = []
  110.         thread_stats = [{'count': 0, 'dyn': 0, 'static': 0} for _ in range(self.thread_count)]
  111.         expected_error_count = total_reqs - self.max_reqs + ((-(total_reqs - self.max_reqs)) % self.per_drop_size)
  112.         expected_average_dispatch = DYNAMIC_REQ_TIME * float(total_reqs - expected_error_count - min(self.thread_count, self.max_reqs)) / self.thread_count / 2
  113.  
  114.         if self.policy == 'random':
  115.             for _ in range(total_reqs):
  116.                 task = asyncio.ensure_future(self.make_req(url))
  117.                 tasks.append(task)
  118.         elif self.policy == 'dt':
  119.             for _ in range(total_reqs - expected_error_count):
  120.                 task = asyncio.ensure_future(self.make_req(url))
  121.                 tasks.append(task)
  122.  
  123.             for _ in range(expected_error_count):
  124.                 task = asyncio.ensure_future(self.make_req(url))
  125.                 fail_expected_tasks.append(task)
  126.  
  127.         elif self.policy == 'dh':
  128.             for _ in range(expected_error_count):
  129.                 task = asyncio.ensure_future(self.make_req(url))
  130.                 fail_expected_tasks.append(task)
  131.  
  132.             for _ in range(total_reqs - expected_error_count):
  133.                 task = asyncio.ensure_future(self.make_req(url))
  134.                 tasks.append(task)
  135.  
  136.         responses = await asyncio.gather(*tasks, *fail_expected_tasks, return_exceptions=True)
  137.  
  138.         responses = sorted(responses, key=lambda x: x.req_ind)
  139.  
  140.         error_count = 0
  141.         total_dispatch = 0
  142.         for res in responses:
  143.             if res.is_exception_of_type(RemoteProtocolError):
  144.                 error_count += 1
  145.                 continue
  146.             elif res.has_exception():
  147.                 raise res.e
  148.  
  149.             res = res.res
  150.             total_dispatch += float(res.headers['stat-req-dispatch'])
  151.             count, dyn, static = int(res.headers['stat-thread-count']), int(res.headers['stat-thread-dynamic']), int(res.headers['stat-thread-static'])
  152.  
  153.             tid = int(res.headers['stat-thread-id'])
  154.             thread_stats[tid]['count'] = max(count, thread_stats[tid]['count'])
  155.             thread_stats[tid]['dyn'] = max(dyn, thread_stats[tid]['dyn'])
  156.             thread_stats[tid]['static'] = max(static, thread_stats[tid]['static'])
  157.             self.assertEqual(count, dyn, f'Mismatch total request and dynamic request. Total: {count}. Dynamic: {dyn}')
  158.             self.assertEqual(static, 0, f'Unexpected static requests count. Expected {0}. Actual {static}')
  159.             # self.assertAlmostEqual(float(res.headers['stat-req-arrival']), arrival_time)
  160.  
  161.         total_count = total_dyn = 0
  162.  
  163.         for stat in thread_stats:
  164.             total_count += stat['count']
  165.             total_dyn += stat['dyn']
  166.  
  167.         print(f'Requests succeeded: {total_count}')
  168.         print(f'Requests failed: {error_count}')
  169.  
  170.         self.assertEqual(total_count, total_reqs - error_count)
  171.         self.assertEqual(total_count, total_dyn)
  172.  
  173.         self.assertEqual(expected_error_count, error_count, f'Unexpected error count. Expected {expected_error_count}. Actual: {error_count}')
  174.  
  175.         self.assertNotEqual(0, total_reqs - error_count, 'No request succeeded')
  176.         average_dispatch = total_dispatch / float(total_reqs - error_count)
  177.         self.assertAlmostEqual(expected_average_dispatch, average_dispatch, delta=max(1, expected_average_dispatch * 0.3),
  178.                                msg=f'Unexpected average dispatch time. Expected: {expected_average_dispatch}. Actual: {average_dispatch}')
  179.  
  180.         # TODO: Fix check. Should check that the correct requests failed for each policy type
  181.         # if self.policy == 'dh':
  182.         #     for res in responses[self.thread_count:self.thread_count + error_count]:
  183.         #         self.assertIsInstance(res.e, RemoteProtocolError)
  184.         # elif self.policy == 'dt':
  185.         #     for res in responses[-error_count:]:
  186.         #         self.assertIsInstance(res.e, RemoteProtocolError)
  187.  
  188.  
  189. class TestDropTailRequests(RequestsTest):
  190.     def __init__(self, *args, **kwargs):
  191.         super().__init__(*args, policy='dt', **kwargs)
  192.  
  193.     def test_drop_single(self):
  194.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  195.  
  196.     def test_drop_double_queue_size(self):
  197.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + self.queue_size * 2))
  198.  
  199.  
  200. class TestDropHeadRequests(RequestsTest):
  201.     def __init__(self, *args, **kwargs):
  202.         super().__init__(*args, policy='dh', **kwargs)
  203.  
  204.     def test_drop_single(self):
  205.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  206.  
  207.     def test_drop_double_queue_size(self):
  208.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + self.queue_size * 2))
  209.  
  210.  
  211. class TestDropRandomRequests(RequestsTest):
  212.     def __init__(self, *args, **kwargs):
  213.         super().__init__(*args, queue_size=16, policy='random', **kwargs)
  214.  
  215.     def test_single_drop_random(self):
  216.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 1))
  217.  
  218.     def test_double_drop_random(self):
  219.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs + 2 * int(0.25 * self.queue_size)))
  220.  
  221.     def test_no_drop(self):
  222.         asyncio.run(self.make_requests(self.dyn_url, self.max_reqs))
  223.  
  224.  
  225. class TestMultiThreaded(RequestsTest):
  226.     def __init__(self, *args, **kwargs):
  227.         super().__init__(*args, queue_size=80, **kwargs)
  228.  
  229.     def test_time_full_queue(self):
  230.         start_time = time.time()
  231.         req_count = self.max_reqs
  232.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  233.         run_time = time.time() - start_time
  234.         expected_runtime = math.ceil(req_count / float(self.thread_count)) * DYNAMIC_REQ_TIME
  235.         # This is optimal so it must be greater
  236.         self.assertGreater(run_time, expected_runtime)
  237.         self.assertLess(run_time, expected_runtime * 2)
  238.  
  239.     def test_better_with_more_threads(self):
  240.         start_time = time.time()
  241.         req_count = self.max_reqs
  242.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  243.         few_threads_run_time = time.time() - start_time
  244.  
  245.         self.server.terminate()
  246.         self.thread_count *= 3
  247.         self.setUp()
  248.  
  249.         start_time = time.time()
  250.         asyncio.run(self.make_requests(self.dyn_url, req_count))
  251.         more_threads_run_time = time.time() - start_time
  252.  
  253.         self.assertTrue(2 * more_threads_run_time < few_threads_run_time < 3 * more_threads_run_time, "Performance doesn't scale as expected with amount of threads")
  254.  
  255.  
  256. class TestStatusCodes(RequestsTest):
  257.     def __init__(self, *args, **kwargs):
  258.         super().__init__(*args, thread_count=1, queue_size=1, **kwargs)
  259.  
  260.     async def _make_req(self, url, expected_status, stat_map, method='get'):
  261.         task = asyncio.ensure_future(self.make_req(url, method=method))
  262.  
  263.         res = await asyncio.ensure_future(task)
  264.  
  265.         headers = res.res.headers
  266.  
  267.         for k in stat_map:
  268.             self.assertIn(k, headers)
  269.             if stat_map[k] is not None:
  270.                 self.assertAlmostEqual(float(headers[k]), stat_map[k], delta=0.03, msg=f'Unexpected value for {k}. Expected: {stat_map[k]}. Actual: {headers[k]}')
  271.  
  272.         self.assertEqual(expected_status, res.res.status_code, f'Unexpected status code. Expected: {expected_status}. Actual: {res.res.status_code}')
  273.  
  274.     def test_404(self):
  275.         stat_map = {
  276.             'stat-req-arrival': None,
  277.             'stat-req-dispatch': 0.00004,
  278.             'stat-thread-id': 0,
  279.             'stat-thread-count': 1,
  280.             'stat-thread-static': 0,
  281.             'stat-thread-dynamic': 0
  282.         }
  283.  
  284.         asyncio.run(self._make_req(self.not_found_url, 404, stat_map))
  285.  
  286.     def test_dynamic(self):
  287.         stat_map = {
  288.             'stat-req-arrival': None,
  289.             'stat-req-dispatch': 0.00004,
  290.             'stat-thread-id': 0,
  291.             'stat-thread-count': 1,
  292.             'stat-thread-static': 0,
  293.             'stat-thread-dynamic': 1
  294.         }
  295.         asyncio.run(self._make_req(self.dyn_url, 200, stat_map))
  296.  
  297.     def test_static(self):
  298.         stat_map = {
  299.             'stat-req-arrival': None,
  300.             'stat-req-dispatch': 0.00004,
  301.             'stat-thread-id': 0,
  302.             'stat-thread-count': 1,
  303.             'stat-thread-static': 1,
  304.             'stat-thread-dynamic': 0
  305.         }
  306.         asyncio.run(self._make_req(self.static_url, 200, stat_map))
  307.  
  308.     def test_forbidden(self):
  309.         stat_map = {
  310.             'stat-req-arrival': None,
  311.             'stat-req-dispatch': 0.00004,
  312.             'stat-thread-id': 0,
  313.             'stat-thread-count': 1,
  314.             'stat-thread-static': 0,
  315.             'stat-thread-dynamic': 0
  316.         }
  317.         asyncio.run(self._make_req(self.forbidden_url, 403, stat_map))
  318.  
  319.     def test_post(self):
  320.         stat_map = {
  321.             'stat-req-arrival': None,
  322.             'stat-req-dispatch': 0.00004,
  323.             'stat-thread-id': 0,
  324.             'stat-thread-count': 1,
  325.             'stat-thread-static': 0,
  326.             'stat-thread-dynamic': 0
  327.         }
  328.         asyncio.run(self._make_req(self.static_url, 501, stat_map, method='post'))
  329.  
  330.  
  331. class TestQueueSmallerThan(RequestsTest):
  332.     def __init__(self, *args, **kwargs):
  333.         super().__init__(*args, thread_count=5, queue_size=2, **kwargs)
  334.  
  335.     def test_small_queue_enough_threads(self):
  336.         asyncio.run(self.make_requests(url=self.dyn_url, total_reqs=self.thread_count))
  337.  
  338.  
  339. if __name__ == '__main__':
  340.     unittest.main()
  341.  
RAW Paste Data