Advertisement
Guest User

ringbuffer-deque-comparison

a guest
Jun 14th, 2013
156
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.63 KB | None | 0 0
  1. #! /opt/local/bin/python3.3
  2.  
  3. from collections import deque
  4. from time import time as now
  5.  
  6. class RollingTimeCounter(tuple):
  7.     ''' Counter to measure events over time in a rolling time window.
  8.  
  9.        This counter provides statistics over events that happened within a
  10.        rolling time window. The window is 'rolling' in the sense that the
  11.        measurements are not nulled each X seconds, but continuously updated.
  12.        This is implemented as a circular ring buffer of buckets, each
  13.        representing a small fraction of the time window.
  14.  
  15.        Example: If 10 buckets are used to measure a time window of one second,
  16.            each bucket covers 1/10 seconds (100ms). The counter represents
  17.            events measured during the last 900-1000ms. More buckets increase
  18.            accuracy.
  19.  
  20.        The data structure is optimized for fast and lock-free updates as well
  21.        as constant and low memory usage. Read performance depends on the number
  22.        of buckets.
  23.    '''
  24.  
  25.     def __new__(cls, window=1, buckets=10):
  26.         self = super().__new__(cls, [[0,0] for x in range(buckets)])
  27.         self.window = window
  28.         self.buckets = buckets
  29.         self.tfunc = lambda: int(now()*buckets/window)
  30.         return self
  31.  
  32.     def increment(self, value=1):
  33.         ''' Increment current bucket. '''
  34.         t = self.tfunc()
  35.         c = self[-t % len(self)]
  36.         if c[0] == t:
  37.             c[1] += value
  38.         else:
  39.             c[0] = t
  40.             c[1] = value
  41.  
  42.     def set(self, value):
  43.         ''' Change the value of the current bucket. '''
  44.         t = self.tfunc()
  45.         c = self[-t % len(self)]
  46.         c[0] = t
  47.         c[1] = value
  48.  
  49.     def set_max(self, value):
  50.         ''' Update current bucket if the new value is higher. '''
  51.         t = self.tfunc()
  52.         c = self[-t % len(self)]
  53.         if c[0] == t:
  54.             c[1] = max(c[1], value)
  55.         else:
  56.             c[0] = t
  57.             c[1] = value
  58.  
  59.     def set_min(self, value):
  60.         ''' Update current bucket if the new value is lower. '''
  61.         t = self.tfunc()
  62.         c = self[-t % len(self)]
  63.         if c[0] == t:
  64.             c[1] = min(c[1], value)
  65.         else:
  66.             c[0] = t
  67.             c[1] = value
  68.  
  69.     def get_buckets(self):
  70.         ''' Return the current values of all bucket. '''
  71.         tmin = self.tfunc() - len(self)
  72.         return [v for t,v in self if t > tmin or 0]
  73.  
  74.     def sum(self):
  75.         ''' Return the total number of events during the observed time window.
  76.            (equals: sum(buckets)) '''
  77.         return sum(self.get_buckets())
  78.  
  79.     def rate(self):
  80.         ''' Return the number of events per seconds. If the time window is
  81.            shorter than a second, the rate is interpolated. If it is longer,
  82.            than an average is returned. (equals: sum(buckets) / window) '''
  83.         return self.sum() / self.window
  84.  
  85.     def rate_max(self):
  86.         ''' Return highest event rate (events per second) observed during the
  87.            time window. '''
  88.         return max(self.get_buckets()) * self.buckets / self.window
  89.  
  90.     def rate_min(self):
  91.         ''' Return lowest event rate (events per second) observed during the
  92.            time window. '''
  93.         return min(self.get_buckets()) * self.buckets / self.window
  94.  
  95.  
  96. class EventCounter(object):
  97.  
  98.     def __init__(self, window=1, buckets=10):
  99.         self.window = window
  100.         self.buckets = buckets
  101.         self.maxlen = buckets - 1
  102.         self.dt = window / buckets
  103.         self.bucket = 0
  104.         self.bucket_lifetime = now() + self.dt
  105.         self.queue = deque()
  106.  
  107.     def increment(self, value=1):
  108.         if now() > self.bucket_lifetime:
  109.             if len(self.queue) > self.maxlen:
  110.                 self.queue.popleft()
  111.             self.queue.append(self.bucket)
  112.             self.bucket = 0
  113.             self.bucket_lifetime = now() + self.dt
  114.         self.bucket += value
  115.  
  116.     def sum(self):
  117.         return sum(self.queue)
  118.  
  119.     def rate(self):
  120.         return self.sum() / self.window
  121.  
  122.     def rate_max(self):
  123.         return max(self.queue) * self.buckets / self.window
  124.  
  125.     def rate_min(self):
  126.         return min(self.queue) * self.buckets / self.window
  127.  
  128.  
  129. def test_counter(counter, seconds):
  130.     start_time = now()
  131.     stop_time = start_time + seconds
  132.     while now() < stop_time:
  133.         counter.increment()
  134.     print (counter.rate())
  135.     print (counter.rate_max())
  136.     print (counter.rate_min())
  137.  
  138. if __name__ == '__main__':
  139.     seconds = 3
  140.     print ('RollingTimeCounter:')
  141.     test_counter(RollingTimeCounter(), seconds)
  142.     print ('EventCounter:')
  143.     test_counter(EventCounter(), seconds)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement