Advertisement
ploffie

Reservoir Sampling

Feb 2nd, 2014
139
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.05 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Fri Jan 31 13:34:25 2014
  4.  
  5. @author:Paul Hofstra
  6.  
  7. http://www.cs.umd.edu/~samir/498/vitter.pdf
  8. Random Sampling with a Reservoir
  9. by JEFFREY SCOTT VITTER
  10. ACM Transactions on Mathematical Software, Vol. 11, No. 1, March 1985.
  11.  
  12.  
  13. """
  14. from __future__ import division
  15.  
  16. from itertools import islice, count, izip
  17. from random import randrange, random
  18. from math import sqrt
  19. from operator import mul
  20. from time import clock
  21. from collections import Counter    
  22.  
  23. def f(s, k, t):
  24.     """probabillity function for skip - formula 3.2 from Vitter
  25.       t: number of processed records
  26.       k: reservoir size
  27.       s: number of records to skip
  28.       t >= k
  29.    """
  30.     if s < k:
  31.         mult, start = (k / (t - k), 0) if t > k else (k / (t + 1), 1)
  32.         return reduce(mul, ((t - k + i) / (t + 1 + i)
  33.             for i in xrange(start,s+1)), mult)
  34.     else:
  35.         return reduce(mul, ((t - i) / (t + s - i)
  36.             for i in xrange(k)), k / (t + s + 1))
  37.  
  38. def algorithm_R(seq, k):
  39.     'take a random sample with k items fom seq'
  40.     it = iter(seq)
  41.     reservoir = list(islice(it, k))
  42.     for t, item in izip(count(k), it):
  43.         j = randrange(0, t + 1)
  44.         if j < k:
  45.             reservoir[j] = item
  46.     return reservoir
  47.  
  48. def algorithm_X(seq, k):
  49.     'take a random sample with k items fom seq'
  50.     it = iter(seq)
  51.     reservoir = list(islice(it, k))
  52.     t = k
  53.     num = 0
  54.     while 1:
  55.         V = random()
  56.         S, t, num = (0, t + 1, num + 1)
  57.         quot = num / t
  58.         while quot > V:
  59.             S, t, num = (S + 1, t + 1, num + 1)
  60.             quot *= num / t
  61.         next(islice(it, S, S), None)
  62.         next_item = next(it, None)
  63.         if next_item:
  64.             reservoir[randrange(0, k)] = next_item
  65.         else:
  66.             break
  67.     return reservoir
  68.  
  69. def algorithm_Z(seq, k, T=22):
  70.     """take a random sample with k items fom seq
  71.    """
  72.     it = iter(seq)
  73.     reservoir = list(islice(it, k))
  74.     t = k
  75.     tresh = T * k
  76.     num = 0
  77.     while t <= tresh:
  78.         V = random()
  79.         S, t, num = (0, t + 1, num + 1)
  80.         quot = num / t
  81.         while quot > V:
  82.             S, t, num = (S + 1, t + 1, num + 1)
  83.             quot *= num / t
  84.         next(islice(it, S, S), None) # skip S records
  85.         next_item = next(it, None)
  86.         if next_item:
  87.             reservoir[randrange(0, k)] = next_item
  88.         else:
  89.             break
  90.     W = random() ** (-1/k)
  91.     term = t - k + 1
  92.     while 1:      # items in sequence
  93.         while 1:  # rejection sampling loop
  94.             U = random()
  95.             X = t * (W - 1)
  96.             S = int(X)
  97.             # first try if h(S) / cg(S) > random
  98.             lhs = (((U *(((t + 1) / term) ** 2)) * (term + S))/ (t + X)) ** (1 / k)
  99.             rhs = (((t + X) / (term + S)) * term) / t
  100.             if lhs <= rhs:
  101.                 W = rhs / lhs
  102.                 break
  103.             # h(S) / cg(S) <= random - now test if f(S) / cg(S) > random
  104.             y = (((U * (t + 1)) / term) * (t + S + 1)) / (t + X)
  105.             denom, numer_lim = (t, term + S) if k < S else (t - k + S, t + 1)
  106.             for numer in xrange(t + S, numer_lim - 1, -1):
  107.                 y *= numer / denom
  108.                 denom -= 1
  109.             W = random() ** (-1/k)            
  110.             if y ** (1 / k) <= (t + X) / t:
  111.                 break
  112.         next(islice(it, S, S), None)
  113.         next_item = next(it, None)
  114.         if next_item:
  115.             reservoir[randrange(0, k)] = next_item
  116.             t += S + 1
  117.             term += S + 1
  118.         else:
  119.             break        
  120.     return reservoir
  121.  
  122. def algorithm_Z_naive(seq, k, T=5):
  123.     it = iter(seq)
  124.     reservoir = list(islice(it, k))
  125.     t = k
  126.     # do first k*T steps with algorithm R
  127.     if T > 0:
  128.         for t, item in islice(izip(count(k), it), k*T):
  129.             j = randrange(0, t + 1) # 0 <= j <= i !!
  130.             if j < k:
  131.                 reservoir[j] = item
  132.         t += 1
  133.     while 1:      # loop over records in sequence
  134.         while 1:  # rejection sampling loop
  135.             X = t * (random() ** (-1/k) - 1) # use inverse of G
  136.             S = int(X)  
  137.             # first try if h(S) / cg(X) >= random
  138.             cgs = (t + 1) / (t - k + 1) * k / (t + X) * (t / (t + X)) ** k
  139.             rg = random() * cgs
  140.             hs = k / (t + 1) * ((t - k + 1) / (t + S - k + 1)) ** (k + 1)
  141.             if rg <= hs:
  142.                 break
  143.             # h(S) / cg(X) < random - now test if f(S) / cg(X) >= random
  144.             if rg <= f(S, k, t):
  145.                 break
  146.         next(islice(it, S, S), None)
  147.         next_item = next(it, None)
  148.         if next_item:
  149.             reservoir[randrange(0, k)] = next_item
  150.             t += S + 1
  151.         else:
  152.             break # no more records        
  153.     return reservoir
  154.    
  155. def big_range(M):
  156.     'generates sequence 0,1,2,3,4,5, ... , M-1 with M <= 2147483647'
  157.     return islice(count(), M)
  158.    
  159. def test_moment(k, counts):
  160.     'Test if the distribution in counts (i, n) is uniform'
  161.     mini = min(i for i,n in counts.iteritems())
  162.     maxi = max(i for i,n in counts.iteritems())
  163.     exp = sum(((i-mini)/maxi) ** k for i in xrange(mini, maxi+1)) / (maxi + 1)
  164.     N = sum(n for i, n in counts.iteritems())
  165.     t = sum(n * ((i - mini) / maxi) ** k for i, n in counts.iteritems()) / N
  166.     s = 1 / sqrt(N)
  167.     form = "actual:{:6.3f} expected:{:6.3f} tolerance:{:6.3f}"
  168.     assert abs(t - exp) < s, form.format(t, exp, s)
  169.    
  170. def test(method, k, M, time=10):
  171.     """check if all numbers have equal chance to be chosen
  172.       The records are [0, 1, 2, ....]    
  173.       method: method for Reservoir Sampling
  174.       k: reservoir size
  175.       M; total number of records
  176.       time: length of the test in seconds
  177.       Test if moments 1-4 of resulting distribution is uniform
  178.       Make sure that T parameter is set low enough to test algo Z, otherwise
  179.          only the startup methods are tested
  180.    """
  181.     counts = Counter()
  182.     t0 = clock()
  183.     for i in xrange(100):
  184.         counts.update(method(big_range(M), k))    
  185.     t = clock() - t0
  186.     N = int(time * 100 / t + 0.5)
  187.     for _ in xrange(N):
  188.         counts.update(method(big_range(M), k))    
  189.     print "Test", method.__name__,
  190.     test_moment(1, counts)
  191.     test_moment(2, counts)
  192.     test_moment(3, counts)
  193.     test_moment(4, counts)  
  194.     print "OK"
  195.    
  196. def do_nothing(seq, k):
  197.     'pass through the data and do nothing'
  198.     it = iter(seq)
  199.     reservoir = list(islice(it, k))
  200.     for record in it:
  201.         pass
  202.     return reservoir
  203.  
  204. def benchmark(M=10000000, k=10):
  205.     print "Benchmark - number of records =", M, ", k =", k
  206.     for method in [algorithm_R, algorithm_X, algorithm_Z, algorithm_Z_naive, do_nothing]:
  207.         t0 = clock()
  208.         if M > 10000000 and method == algorithm_R:
  209.             continue
  210.         if M > 100000000 and method == algorithm_X:
  211.             continue
  212.         method(big_range(M), k)        
  213.         print " {:20s} time:{:10.3f} secs".format(method.__name__, clock() - t0)
  214.  
  215. benchmark(M=10000000, k=10)
  216. test(algorithm_Z_naive, 10, 100, 100)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement