Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import collections
- import functools
- import time
- class RateLimiter(object):
- def __init__(self, max_calls, period=1.0, callback=None):
- self.iplist = {}
- self.calls = collections.deque()
- self.period = period
- self.max_calls = max_calls
- self.callback = callback
- def __call__(self, f):
- @functools.wraps(f)
- def wrapped(*args, **kwargs):
- ip = kwargs.get("ip")
- self.nowip = ip
- with self:
- if not self.exceed:
- return f(*args, **kwargs)
- # raise Exception("频率过快")
- self.callback(ip)
- return wrapped
- def __enter__(self):
- ip = self.nowip
- _r = self.iplist.get(ip,None)
- if not _r:
- self.iplist[ip] = collections.deque()
- if len(self.iplist[ip]) < self.max_calls:
- self.exceed = False
- else:
- self.exceed = True
- def __exit__(self, exc_type, exc_val, exc_tb):
- ip = self.nowip
- self.iplist[ip].append(time.time())
- while self._timespan >= self.period:
- self.iplist[ip].popleft()
- @property
- def _timespan(self):
- ip = self.nowip
- return self.iplist[ip][-1] - self.iplist[ip][0]
- def over_speed_waring(ip):
- print(ip,"速度过快")
- @RateLimiter(3,1.0,over_speed_waring)
- def func_testcall(ip):
- print('run,ip is',ip)
- # time.sleep(1)
- func_testcall(ip=1)
- func_testcall(ip=2)
- func_testcall(ip=2)
- func_testcall(ip=2)
- func_testcall(ip=1)
- func_testcall(ip=2)
- # func_testcall(ip=1)
- func_testcall(ip=2)
- # func_testcall(ip=1)
- func_testcall(ip=2)
Add Comment
Please, Sign In to add comment