Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /opt/local/bin/python3.3
- from collections import deque
- from time import time as now
- class RollingTimeCounter(tuple):
- ''' Counter to measure events over time in a rolling time window.
- This counter provides statistics over events that happened within a
- rolling time window. The window is 'rolling' in the sense that the
- measurements are not nulled each X seconds, but continuously updated.
- This is implemented as a circular ring buffer of buckets, each
- representing a small fraction of the time window.
- Example: If 10 buckets are used to measure a time window of one second,
- each bucket covers 1/10 seconds (100ms). The counter represents
- events measured during the last 900-1000ms. More buckets increase
- accuracy.
- The data structure is optimized for fast and lock-free updates as well
- as constant and low memory usage. Read performance depends on the number
- of buckets.
- '''
- def __new__(cls, window=1, buckets=10):
- self = super().__new__(cls, [[0,0] for x in range(buckets)])
- self.window = window
- self.buckets = buckets
- self.tfunc = lambda: int(now()*buckets/window)
- return self
- def increment(self, value=1):
- ''' Increment current bucket. '''
- t = self.tfunc()
- c = self[-t % len(self)]
- if c[0] == t:
- c[1] += value
- else:
- c[0] = t
- c[1] = value
- def set(self, value):
- ''' Change the value of the current bucket. '''
- t = self.tfunc()
- c = self[-t % len(self)]
- c[0] = t
- c[1] = value
- def set_max(self, value):
- ''' Update current bucket if the new value is higher. '''
- t = self.tfunc()
- c = self[-t % len(self)]
- if c[0] == t:
- c[1] = max(c[1], value)
- else:
- c[0] = t
- c[1] = value
- def set_min(self, value):
- ''' Update current bucket if the new value is lower. '''
- t = self.tfunc()
- c = self[-t % len(self)]
- if c[0] == t:
- c[1] = min(c[1], value)
- else:
- c[0] = t
- c[1] = value
- def get_buckets(self):
- ''' Return the current values of all bucket. '''
- tmin = self.tfunc() - len(self)
- return [v for t,v in self if t > tmin or 0]
- def sum(self):
- ''' Return the total number of events during the observed time window.
- (equals: sum(buckets)) '''
- return sum(self.get_buckets())
- def rate(self):
- ''' Return the number of events per seconds. If the time window is
- shorter than a second, the rate is interpolated. If it is longer,
- than an average is returned. (equals: sum(buckets) / window) '''
- return self.sum() / self.window
- def rate_max(self):
- ''' Return highest event rate (events per second) observed during the
- time window. '''
- return max(self.get_buckets()) * self.buckets / self.window
- def rate_min(self):
- ''' Return lowest event rate (events per second) observed during the
- time window. '''
- return min(self.get_buckets()) * self.buckets / self.window
- class EventCounter(object):
- def __init__(self, window=1, buckets=10):
- self.window = window
- self.buckets = buckets
- self.maxlen = buckets - 1
- self.dt = window / buckets
- self.bucket = 0
- self.bucket_lifetime = now() + self.dt
- self.queue = deque()
- def increment(self, value=1):
- if now() > self.bucket_lifetime:
- if len(self.queue) > self.maxlen:
- self.queue.popleft()
- self.queue.append(self.bucket)
- self.bucket = 0
- self.bucket_lifetime = now() + self.dt
- self.bucket += value
- def sum(self):
- return sum(self.queue)
- def rate(self):
- return self.sum() / self.window
- def rate_max(self):
- return max(self.queue) * self.buckets / self.window
- def rate_min(self):
- return min(self.queue) * self.buckets / self.window
- def test_counter(counter, seconds):
- start_time = now()
- stop_time = start_time + seconds
- while now() < stop_time:
- counter.increment()
- print (counter.rate())
- print (counter.rate_max())
- print (counter.rate_min())
- if __name__ == '__main__':
- seconds = 3
- print ('RollingTimeCounter:')
- test_counter(RollingTimeCounter(), seconds)
- print ('EventCounter:')
- test_counter(EventCounter(), seconds)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement