 # Reservoir Sampling

Feb 2nd, 2014
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
1. # -*- coding: utf-8 -*-
2. """
3. Created on Fri Jan 31 13:34:25 2014
4.
5. @author:Paul Hofstra
6.
7. http://www.cs.umd.edu/~samir/498/vitter.pdf
8. Random Sampling with a Reservoir
9. by JEFFREY SCOTT VITTER
10. ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985.
11.
12.
13. """
14. from __future__ import division
15.
16. from itertools import islice, count, izip
17. from random import randrange, random
18. from math import sqrt
19. from operator import mul
20. from time import clock
21. from collections import Counter
22.
23. def f(s, k, t):
24.     """probabillity function for skip - formula 3.2 from Vitter
25.       t: number of processed records
26.       k: reservoir size
27.       s: number of records to skip
28.       t >= k
29.    """
30.     if s < k:
31.         mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1)
32.         return reduce(mul, ((t - k + i) / (t + 1 + i)
33.             for i in xrange(start,s+1)), mult)
34.     else:
35.         return reduce(mul, ((t - i) / (t + s - i)
36.             for i in xrange(k)), k / (t + s + 1))
37.
38. def algorithm_R(seq, k):
39.     'take a random sample with k items fom seq'
40.     it = iter(seq)
41.     reservoir = list(islice(it, k))
42.     for t, item in izip(count(k), it):
43.         j = randrange(0, t + 1)
44.         if j < k:
45.             reservoir[j] = item
46.     return reservoir
47.
48. def algorithm_X(seq, k):
49.     'take a random sample with k items fom seq'
50.     it = iter(seq)
51.     reservoir = list(islice(it, k))
52.     t = k
53.     num = 0
54.     while 1:
55.         V = random()
56.         S, t, num = (0, t + 1, num + 1)
57.         quot = num / t
58.         while quot > V:
59.             S, t, num = (S + 1, t + 1, num + 1)
60.             quot *= num / t
61.         next(islice(it, S, S), None)
62.         next_item = next(it, None)
63.         if next_item:
64.             reservoir[randrange(0, k)] = next_item
65.         else:
66.             break
67.     return reservoir
68.
69. def algorithm_Z(seq, k, T=22):
70.     """take a random sample with k items fom seq
71.    """
72.     it = iter(seq)
73.     reservoir = list(islice(it, k))
74.     t = k
75.     tresh = T * k
76.     num = 0
77.     while t <= tresh:
78.         V = random()
79.         S, t, num = (0, t + 1, num + 1)
80.         quot = num / t
81.         while quot > V:
82.             S, t, num = (S + 1, t + 1, num + 1)
83.             quot *= num / t
84.         next(islice(it, S, S), None) # skip S records
85.         next_item = next(it, None)
86.         if next_item:
87.             reservoir[randrange(0, k)] = next_item
88.         else:
89.             break
90.     W = random() ** (-1/k)
91.     term = t - k + 1
92.     while 1:      # items in sequence
93.         while 1:  # rejection sampling loop
94.             U = random()
95.             X = t * (W - 1)
96.             S = int(X)
97.             # first try if h(S) / cg(S) > random
98.             lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k)
99.             rhs = (((t + X) / (term + S)) * term) / t
100.             if lhs <= rhs:
101.                 W = rhs / lhs
102.                 break
103.             # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random
104.             y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X)
105.             denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1)
106.             for numer in xrange(t + S, numer_lim - 1, -1):
107.                 y *= numer / denom
108.                 denom -= 1
109.             W = random() ** (-1/k)
110.             if y ** (1 / k) <= (t + X) / t:
111.                 break
112.         next(islice(it, S, S), None)
113.         next_item = next(it, None)
114.         if next_item:
115.             reservoir[randrange(0, k)] = next_item
116.             t += S + 1
117.             term += S + 1
118.         else:
119.             break
120.     return reservoir
121.
122. def algorithm_Z_naive(seq, k, T=5):
123.     it = iter(seq)
124.     reservoir = list(islice(it, k))
125.     t = k
126.     # do first k*T steps with algorithm R
127.     if T > 0:
128.         for t, item in islice(izip(count(k), it), k*T):
129.             j = randrange(0, t + 1) # 0 <= j <= i !!
130.             if j < k:
131.                 reservoir[j] = item
132.         t += 1
133.     while 1:      # loop over records in sequence
134.         while 1:  # rejection sampling loop
135.             X = t * (random() ** (-1/k) - 1) # use inverse of G
136.             S = int(X)
137.             # first try if h(S) / cg(X) >= random
138.             cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k
139.             rg = random() * cgs
140.             hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1)
141.             if rg <= hs:
142.                 break
143.             # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random
144.             if rg <= f(S, k, t):
145.                 break
146.         next(islice(it, S, S), None)
147.         next_item = next(it, None)
148.         if next_item:
149.             reservoir[randrange(0, k)] = next_item
150.             t += S + 1
151.         else:
152.             break # no more records
153.     return reservoir
154.
155. def big_range(M):
156.     'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647'
157.     return islice(count(), M)
158.
159. def test_moment(k, counts):
160.     'Test if the distribution in counts (i, n) is uniform'
161.     mini = min(i for i,n in counts.iteritems())
162.     maxi = max(i for i,n in counts.iteritems())
163.     exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1)
164.     N = sum(n for i, n in counts.iteritems())
165.     t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N
166.     s = 1 / sqrt(N)
167.     form = "actual:{:6.3f} expected:{:6.3f} tolerance:{:6.3f}"
168.     assert abs(t - exp) < s, form.format(t, exp, s)
169.
170. def test(method, k, M, time=10):
171.     """check if all numbers have equal chance to be chosen
172.       The records are [0, 1, 2, ....]
173.       method: method for Reservoir Sampling
174.       k: reservoir size
175.       M; total number of records
176.       time: length of the test in seconds
177.       Test if moments 1-4 of resulting distribution is uniform
178.       Make sure that T parameter is set low enough to test algo Z, otherwise
179.          only the startup methods are tested
180.    """
181.     counts = Counter()
182.     t0 = clock()
183.     for i in xrange(100):
184.         counts.update(method(big_range(M), k))
185.     t = clock() - t0
186.     N = int(time * 100 / t + 0.5)
187.     for _ in xrange(N):
188.         counts.update(method(big_range(M), k))
189.     print "Test", method.__name__,
190.     test_moment(1, counts)
191.     test_moment(2, counts)
192.     test_moment(3, counts)
193.     test_moment(4, counts)
194.     print "OK"
195.
196. def do_nothing(seq, k):
197.     'pass through the data and do nothing'
198.     it = iter(seq)
199.     reservoir = list(islice(it, k))
200.     for record in it:
201.         pass
202.     return reservoir
203.
204. def benchmark(M=10000000, k=10):
205.     print "Benchmark - number of records =", M, ", k =", k
206.     for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]:
207.         t0 = clock()
208.         if M > 10000000 and method == algorithm_R:
209.             continue
210.         if M > 100000000 and method == algorithm_X:
211.             continue
212.         method(big_range(M), k)
213.         print " {:20s} time:{:10.3f} secs".format(method.__name__, clock() - t0)
214.
215. benchmark(M=10000000, k=10)
216. test(algorithm_Z_naive, 10, 100, 100)