Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class RateLimiter:
- def __init__(self, max_rate: int, period_s: float or int, logger: logging.Logger):
- """
- Allows `max_rate` per `period_s`.
- :param max_rate: number of hits allowed per `period_s`
- :param period_s: period in seconds
- :param logger: logger to use
- """
- assert isinstance(max_rate, int) and max_rate > 0
- assert period_s > 0
- self._max_rate = max_rate
- self._period_s = period_s
- self._lock = asyncio.Lock()
- self._loop = asyncio.get_event_loop()
- self._logger = logger
- self._timer_handle = None
- # We'll initially allow `max_rate` to happen in parallel, and then release
- # the semaphores as new tasks can be started
- self._sema = asyncio.Semaphore(max_rate)
- # we'll push the task end-time to this queue during `__aexit__`
- self._end_time_q = deque()
- async def __aenter__(self):
- await self._sema.acquire()
- def _release_timer(self):
- try:
- now = time.time()
- sleep_s = 0
- while sleep_s == 0 and len(self._end_time_q):
- q_ts = self._end_time_q[0]
- window_s = now - q_ts
- sleep_s = max(0, self._period_s - window_s) # min clipped to zero
- if sleep_s == 0:
- self._end_time_q.popleft()
- self._sema.release()
- if sleep_s:
- self._timer_handle = self._loop.call_later(sleep_s, self._release_timer)
- else:
- self._timer_handle = None
- except:
- self._logger.exception("Failed while attempting to release sempahores")
- self._timer_handle = self._loop.call_later(1, self._release_timer)
- raise
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- # NOTE: even if there's a pending exception we have to assume the call counted
- # Register finish time before we lock
- # It's important that no yields happen before this
- # If this fails we'll deadlock one `max_hit` per period
- self._end_time_q.append(time.time())
- if not self._timer_handle:
- self._timer_handle = self._loop.call_later(self._period_s, self._release_timer)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement