# -*- coding: utf-8 -*- """ Created on Fri Jan 31 13:34:25 2014 @author:Paul Hofstra http://www.cs.umd.edu/~samir/498/vitter.pdf Random Sampling with a Reservoir by JEFFREY SCOTT VITTER ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985. """ from __future__ import division from itertools import islice, count, izip from random import randrange, random from math import sqrt from operator import mul from time import clock from collections import Counter def f(s, k, t): """probabillity function for skip - formula 3.2 from Vitter t: number of processed records k: reservoir size s: number of records to skip t >= k """ if s < k: mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1) return reduce(mul, ((t - k + i) / (t + 1 + i) for i in xrange(start,s+1)), mult) else: return reduce(mul, ((t - i) / (t + s - i) for i in xrange(k)), k / (t + s + 1)) def algorithm_R(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) for t, item in izip(count(k), it): j = randrange(0, t + 1) if j < k: reservoir[j] = item return reservoir def algorithm_X(seq, k): 'take a random sample with k items fom seq' it = iter(seq) reservoir = list(islice(it, k)) t = k num = 0 while 1: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t next(islice(it, S, S), None) next_item = next(it, None) if next_item: reservoir[randrange(0, k)] = next_item else: break return reservoir def algorithm_Z(seq, k, T=22): """take a random sample with k items fom seq """ it = iter(seq) reservoir = list(islice(it, k)) t = k tresh = T * k num = 0 while t <= tresh: V = random() S, t, num = (0, t + 1, num + 1) quot = num / t while quot > V: S, t, num = (S + 1, t + 1, num + 1) quot *= num / t next(islice(it, S, S), None) # skip S records next_item = next(it, None) if next_item: reservoir[randrange(0, k)] = next_item else: break W = random() ** (-1/k) term = t - k + 1 while 1: # items in sequence while 1: # rejection sampling loop U = random() X = t * (W - 1) S = int(X) # first try if h(S) / cg(S) > random lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k) rhs = (((t + X) / (term + S)) * term) / t if lhs <= rhs: W = rhs / lhs break # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X) denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1) for numer in xrange(t + S, numer_lim - 1, -1): y *= numer / denom denom -= 1 W = random() ** (-1/k) if y ** (1 / k) <= (t + X) / t: break next(islice(it, S, S), None) next_item = next(it, None) if next_item: reservoir[randrange(0, k)] = next_item t += S + 1 term += S + 1 else: break return reservoir def algorithm_Z_naive(seq, k, T=5): it = iter(seq) reservoir = list(islice(it, k)) t = k # do first k*T steps with algorithm R if T > 0: for t, item in islice(izip(count(k), it), k*T): j = randrange(0, t + 1) # 0 <= j <= i !! if j < k: reservoir[j] = item t += 1 while 1: # loop over records in sequence while 1: # rejection sampling loop X = t * (random() ** (-1/k) - 1) # use inverse of G S = int(X) # first try if h(S) / cg(X) >= random cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k rg = random() * cgs hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1) if rg <= hs: break # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random if rg <= f(S, k, t): break next(islice(it, S, S), None) next_item = next(it, None) if next_item: reservoir[randrange(0, k)] = next_item t += S + 1 else: break # no more records return reservoir def big_range(M): 'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647' return islice(count(), M) def test_moment(k, counts): 'Test if the distribution in counts (i, n) is uniform' mini = min(i for i,n in counts.iteritems()) maxi = max(i for i,n in counts.iteritems()) exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1) N = sum(n for i, n in counts.iteritems()) t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N s = 1 / sqrt(N) form = "actual:{:6.3f} expected:{:6.3f} tolerance:{:6.3f}" assert abs(t - exp) < s, form.format(t, exp, s) def test(method, k, M, time=10): """check if all numbers have equal chance to be chosen The records are [0, 1, 2, ....] method: method for Reservoir Sampling k: reservoir size M; total number of records time: length of the test in seconds Test if moments 1-4 of resulting distribution is uniform Make sure that T parameter is set low enough to test algo Z, otherwise only the startup methods are tested """ counts = Counter() t0 = clock() for i in xrange(100): counts.update(method(big_range(M), k)) t = clock() - t0 N = int(time * 100 / t + 0.5) for _ in xrange(N): counts.update(method(big_range(M), k)) print "Test", method.__name__, test_moment(1, counts) test_moment(2, counts) test_moment(3, counts) test_moment(4, counts) print "OK" def do_nothing(seq, k): 'pass through the data and do nothing' it = iter(seq) reservoir = list(islice(it, k)) for record in it: pass return reservoir def benchmark(M=10000000, k=10): print "Benchmark - number of records =", M, ", k =", k for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]: t0 = clock() if M > 10000000 and method == algorithm_R: continue if M > 100000000 and method == algorithm_X: continue method(big_range(M), k) print " {:20s} time:{:10.3f} secs".format(method.__name__, clock() - t0) benchmark(M=10000000, k=10) test(algorithm_Z_naive, 10, 100, 100)