"""A library for executing running calculations.
A running calculation is an object that can be fed one value at a time. This
allows running several running calculations on a single iterator of values in
parallel. This isn't possible with the built-in variants of most calculations,
such as max() and heapq.nlargest().
"""
from math import sqrt
from heapq import heappush, heappushpop
from functools import partial
class RunningCalc(object):
pass
def apply(iterable, *running_calcs):
"""Run several running calculations on a single iterable of values."""
feeds = [rcalc.feed for rcalc in running_calcs]
for value in iterable:
for rcalc_feed in running_calcs:
rcalc_feed(value)
return tuple([rcalc.value for rcalc in running_calcs])
class RunningMax(RunningCalc):
def __init__(self):
self.value = None
def feed(self, value):
if self.value is None or value > self.value:
self.value = value
class RunningMin(RunningCalc):
def __init__(self):
self.value = None
def feed(self, value):
if self.value is None or value < self.value:
self.value = value
class RunningCount(RunningCalc):
def __init__(self, initial_value=0):
self.value = initial_value
def feed(self, value):
self.value += 1
class RunningSum(RunningCalc):
def __init__(self, initial_value=0):
self.value = initial_value
def feed(self, value):
self.value += value
class RunningAverage(RunningCalc):
def __init__(self):
self.value = 0.0
self.n = 0
def feed(self, value):
self.n += 1
self.value += (value - self.value) / self.n
class RunningVariance(RunningCalc):
"""calculate a running variance using the Welford algorithm"""
def __init__(self):
self.n = 0
self.mean = 0.0
self.M2 = 0.0
def feed(self, value):
self.n += 1
delta = value - mean
self.mean += delta / n
self.M2 += delta * (value - self.mean) # uses the new value of mean!
@property
def populationVariance(self):
return (self.M2 / self.n) if self.n > 0 else 0
value = populationVariance
@property
def sampleVariance(self):
return (self.M2 / (self.n - 1)) if self.n > 1 else 0
def RunningStandardDeviation(RunningCalc):
def __init__(self):
self._running_variance = RunningVariance()
def feed(self, value):
self._running_variance.feed(value)
@property
def populationStandardDeviation(self):
return sqrt(self._running_variance.populationVariance)
value = populationStandardDeviation
@property
def samplepopulationStandardDeviation(self):
return sqrt(self._running_variance.sampleVariance)
class RunningNLargest(RunningCalc):
def __init__(self, N):
self.heap = []
self.count = 0
self.N = N
def feed(self, value):
self.count += 1
if self.count <= self.N:
heappush(self.heap, value)
else:
heappushpop(self.heap, value)
@property
def value(self):
return sorted(self.heap, reversed=True)
class RunningNSmallest(RunningNLargest):
"""Only works on negatable values!"""
# Why isn't there a built-in max-heap? :(
def feed(self, value):
RunningNLargest.feed(self, -value) # note the minus!
@property
def value(self):
return sorted([-x for x in self.heap])