Guest User

Untitled

a guest
Apr 6th, 2018
201
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.90 KB | None | 0 0
  1. import asyncio
  2. from collections import deque
  3. from datetime import datetime
  4.  
  5. class RateLimitingSemaphore:
  6.     def __init__(self, qps_limit, loop=None):
  7.         self.loop = loop or asyncio.get_event_loop()
  8.         self.qps_limit = qps_limit
  9.  
  10.         # The number of calls that are queued up, waiting for their turn.
  11.         self.queued_calls = 0
  12.  
  13.         # The times of the last N executions, where N=qps_limit - this should allow us to calculate the QPS within the
  14.         # last ~ second. Note that this also allows us to schedule the first N executions immediately.
  15.         self.call_times = deque()
  16.  
  17.     async def __aenter__(self):
  18.         self.queued_calls += 1
  19.         while True:
  20.             cur_rate = 0
  21.             if len(self.call_times) == self.qps_limit:
  22.                 cur_rate = len(self.call_times) / (self.loop.time() - self.call_times[0])
  23.             if cur_rate < self.qps_limit:
  24.                 break
  25.             interval = 1. / self.qps_limit
  26.             elapsed_time = self.loop.time() - self.call_times[-1]
  27.             await asyncio.sleep(self.queued_calls * interval - elapsed_time)
  28.         self.queued_calls -= 1
  29.  
  30.         if len(self.call_times) == self.qps_limit:
  31.             self.call_times.popleft()
  32.         self.call_times.append(self.loop.time())
  33.  
  34.     async def __aexit__(self, exc_type, exc, tb):
  35.         pass
  36.  
  37.  
  38. async def test(qps):
  39.     executions = 0
  40.     async def io_operation(semaphore):
  41.         async with semaphore:
  42.             nonlocal executions
  43.             executions += 1
  44.  
  45.     semaphore = RateLimitingSemaphore(qps)
  46.     start = datetime.now()
  47.     await asyncio.wait([io_operation(semaphore) for i in range(5*qps)])
  48.     dt = (datetime.now() - start).total_seconds()
  49.     print('Desired QPS:', qps, 'Achieved QPS:', executions / dt)
  50.  
  51. if __name__ == "__main__":
  52.     asyncio.get_event_loop().run_until_complete(test(1))
  53.     asyncio.get_event_loop().close()
Advertisement
Add Comment
Please, Sign In to add comment