Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- Created on Fri Jan 31 13:34:25 2014
- @author:Paul Hofstra
- http://www.cs.umd.edu/~samir/498/vitter.pdf
- Random Sampling with a Reservoir
- by JEFFREY SCOTT VITTER
- ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985.
- """
- from __future__ import division
- from itertools import islice, count, izip
- from random import randrange, random
- from math import sqrt
- from operator import mul
- from time import clock
- from collections import Counter
- def f(s, k, t):
- """probabillity function for skip - formula 3.2 from Vitter
- t: number of processed records
- k: reservoir size
- s: number of records to skip
- t >= k
- """
- if s < k:
- mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1)
- return reduce(mul, ((t - k + i) / (t + 1 + i)
- for i in xrange(start,s+1)), mult)
- else:
- return reduce(mul, ((t - i) / (t + s - i)
- for i in xrange(k)), k / (t + s + 1))
- def algorithm_R(seq, k):
- 'take a random sample with k items fom seq'
- it = iter(seq)
- reservoir = list(islice(it, k))
- for t, item in izip(count(k), it):
- j = randrange(0, t + 1)
- if j < k:
- reservoir[j] = item
- return reservoir
- def algorithm_X(seq, k):
- 'take a random sample with k items fom seq'
- it = iter(seq)
- reservoir = list(islice(it, k))
- t = k
- num = 0
- while 1:
- V = random()
- S, t, num = (0, t + 1, num + 1)
- quot = num / t
- while quot > V:
- S, t, num = (S + 1, t + 1, num + 1)
- quot *= num / t
- next(islice(it, S, S), None)
- next_item = next(it, None)
- if next_item:
- reservoir[randrange(0, k)] = next_item
- else:
- break
- return reservoir
- def algorithm_Z(seq, k, T=22):
- """take a random sample with k items fom seq
- """
- it = iter(seq)
- reservoir = list(islice(it, k))
- t = k
- tresh = T * k
- num = 0
- while t <= tresh:
- V = random()
- S, t, num = (0, t + 1, num + 1)
- quot = num / t
- while quot > V:
- S, t, num = (S + 1, t + 1, num + 1)
- quot *= num / t
- next(islice(it, S, S), None) # skip S records
- next_item = next(it, None)
- if next_item:
- reservoir[randrange(0, k)] = next_item
- else:
- break
- W = random() ** (-1/k)
- term = t - k + 1
- while 1: # items in sequence
- while 1: # rejection sampling loop
- U = random()
- X = t * (W - 1)
- S = int(X)
- # first try if h(S) / cg(S) > random
- lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k)
- rhs = (((t + X) / (term + S)) * term) / t
- if lhs <= rhs:
- W = rhs / lhs
- break
- # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random
- y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X)
- denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1)
- for numer in xrange(t + S, numer_lim - 1, -1):
- y *= numer / denom
- denom -= 1
- W = random() ** (-1/k)
- if y ** (1 / k) <= (t + X) / t:
- break
- next(islice(it, S, S), None)
- next_item = next(it, None)
- if next_item:
- reservoir[randrange(0, k)] = next_item
- t += S + 1
- term += S + 1
- else:
- break
- return reservoir
- def algorithm_Z_naive(seq, k, T=5):
- it = iter(seq)
- reservoir = list(islice(it, k))
- t = k
- # do first k*T steps with algorithm R
- if T > 0:
- for t, item in islice(izip(count(k), it), k*T):
- j = randrange(0, t + 1) # 0 <= j <= i !!
- if j < k:
- reservoir[j] = item
- t += 1
- while 1: # loop over records in sequence
- while 1: # rejection sampling loop
- X = t * (random() ** (-1/k) - 1) # use inverse of G
- S = int(X)
- # first try if h(S) / cg(X) >= random
- cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k
- rg = random() * cgs
- hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1)
- if rg <= hs:
- break
- # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random
- if rg <= f(S, k, t):
- break
- next(islice(it, S, S), None)
- next_item = next(it, None)
- if next_item:
- reservoir[randrange(0, k)] = next_item
- t += S + 1
- else:
- break # no more records
- return reservoir
- def big_range(M):
- 'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647'
- return islice(count(), M)
- def test_moment(k, counts):
- 'Test if the distribution in counts (i, n) is uniform'
- mini = min(i for i,n in counts.iteritems())
- maxi = max(i for i,n in counts.iteritems())
- exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1)
- N = sum(n for i, n in counts.iteritems())
- t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N
- s = 1 / sqrt(N)
- form = "actual:{:6.3f} expected:{:6.3f} tolerance:{:6.3f}"
- assert abs(t - exp) < s, form.format(t, exp, s)
- def test(method, k, M, time=10):
- """check if all numbers have equal chance to be chosen
- The records are [0, 1, 2, ....]
- method: method for Reservoir Sampling
- k: reservoir size
- M; total number of records
- time: length of the test in seconds
- Test if moments 1-4 of resulting distribution is uniform
- Make sure that T parameter is set low enough to test algo Z, otherwise
- only the startup methods are tested
- """
- counts = Counter()
- t0 = clock()
- for i in xrange(100):
- counts.update(method(big_range(M), k))
- t = clock() - t0
- N = int(time * 100 / t + 0.5)
- for _ in xrange(N):
- counts.update(method(big_range(M), k))
- print "Test", method.__name__,
- test_moment(1, counts)
- test_moment(2, counts)
- test_moment(3, counts)
- test_moment(4, counts)
- print "OK"
- def do_nothing(seq, k):
- 'pass through the data and do nothing'
- it = iter(seq)
- reservoir = list(islice(it, k))
- for record in it:
- pass
- return reservoir
- def benchmark(M=10000000, k=10):
- print "Benchmark - number of records =", M, ", k =", k
- for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]:
- t0 = clock()
- if M > 10000000 and method == algorithm_R:
- continue
- if M > 100000000 and method == algorithm_X:
- continue
- method(big_range(M), k)
- print " {:20s} time:{:10.3f} secs".format(method.__name__, clock() - t0)
- benchmark(M=10000000, k=10)
- test(algorithm_Z_naive, 10, 100, 100)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement