Advertisement
Guest User

Untitled

a guest
Jun 28th, 2017
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.23 KB | None | 0 0
  1. class RateLimiter:
  2. def __init__(self, max_rate: int, period_s: float or int, logger: logging.Logger):
  3. """
  4. Allows `max_rate` per `period_s`.
  5.  
  6. :param max_rate: number of hits allowed per `period_s`
  7. :param period_s: period in seconds
  8. :param logger: logger to use
  9. """
  10.  
  11. assert isinstance(max_rate, int) and max_rate > 0
  12. assert period_s > 0
  13.  
  14. self._max_rate = max_rate
  15. self._period_s = period_s
  16. self._lock = asyncio.Lock()
  17. self._loop = asyncio.get_event_loop()
  18. self._logger = logger
  19. self._timer_handle = None
  20.  
  21. # We'll initially allow `max_rate` to happen in parallel, and then release
  22. # the semaphores as new tasks can be started
  23. self._sema = asyncio.Semaphore(max_rate)
  24.  
  25. # we'll push the task end-time to this queue during `__aexit__`
  26. self._end_time_q = deque()
  27.  
  28. async def __aenter__(self):
  29. await self._sema.acquire()
  30.  
  31. def _release_timer(self):
  32. try:
  33. now = time.time()
  34. sleep_s = 0
  35. while sleep_s == 0 and len(self._end_time_q):
  36. q_ts = self._end_time_q[0]
  37. window_s = now - q_ts
  38. sleep_s = max(0, self._period_s - window_s) # min clipped to zero
  39. if sleep_s == 0:
  40. self._end_time_q.popleft()
  41. self._sema.release()
  42.  
  43. if sleep_s:
  44. self._timer_handle = self._loop.call_later(sleep_s, self._release_timer)
  45. else:
  46. self._timer_handle = None
  47. except:
  48. self._logger.exception("Failed while attempting to release sempahores")
  49. self._timer_handle = self._loop.call_later(1, self._release_timer)
  50. raise
  51.  
  52. async def __aexit__(self, exc_type, exc_val, exc_tb):
  53. # NOTE: even if there's a pending exception we have to assume the call counted
  54.  
  55. # Register finish time before we lock
  56. # It's important that no yields happen before this
  57. # If this fails we'll deadlock one `max_hit` per period
  58. self._end_time_q.append(time.time())
  59.  
  60. if not self._timer_handle:
  61. self._timer_handle = self._loop.call_later(self._period_s, self._release_timer)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement