Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def _step_left_endpoint(obj_func, y, L, J, w, m):
- while y < obj_func(L):
- L -= w
- if m:
- J -= 1
- if J <= 0:
- break
- return L
- def _step_right_endpoint(obj_func, y, R, K, w, m):
- while y < obj_func(R):
- R += w
- if m:
- K -= 1
- if K <= 0:
- break
- return R
- class slice_sampler(object):
- """ Generic slice-sampler.
- Base class for both univariate and multivariate methods
- See http://www.cs.toronto.edu/~radford/ftp/slc-samp.pdf for theory
- and details
- """
- def __init__(self, log_f):
- """ Instantiates a slice sampler
- Parameters
- ----------
- log_f : callable
- A function that returns the log of the function you want to
- sample and accepts a numpy array as an argument (the x)
- """
- if not hasattr(log_f, '__call__'):
- raise TypeError("log_f is not callable")
- ## The function to sample
- self._g = log_f
- def set_function(self, log_f):
- """ Sets the function being sampled
- Parameters
- ----------
- log_f : callable
- A function that returns the log of the function you want to
- sample and accepts a numpy array (or scalar if univariate function)
- as an argument (the x)
- """
- self._g = log_f
- class univariate_slice_sampler(slice_sampler):
- """ Does slice sampling on univariate distributions
- """
- def __init__(self, log_f, adapt_w = False, start_w = 0.1):
- """ Instantiates a slice sampler
- Parameters
- ----------
- log_f : callable
- A function that returns the log of the function you want to
- sample and accepts a scalar as an argument (the x)
- adapt_w : boolean
- Whether to adapt w during sampling. Will work in between samplings
- start_w : float
- Starting value for w, necessary if adapting w during sampling
- """
- super(univariate_slice_sampler, self).__init__(log_f)
- self._w = start_w
- self._adapt_w = adapt_w
- ## Parameter for number of data points going into w
- self._w_n = 1.
- def accept_doubling(self, x0, x1, y, w, interval):
- """ Test for whether a new point, x1, is an acceptable next state,
- when the interval was found by the doubling procedure
- (See Radford paper at http://www.cs.toronto.edu/~radford/ftp/slc-samp.pdf)
- Parameters
- ----------
- x0 : float
- The current point
- x1 : float
- The possible next point
- y : float
- The vertical level defining the slice
- w : float
- Estimte of typical slice size
- interval : Ranger.Range
- The interval found be the doubling procedure
- Returns
- -------
- Whether or not x1 is an acceptable next state
- """
- L_hat = interval.lowerEndpoint()
- R_hat = interval.upperEndpoint()
- D = False
- while (R_hat - L_hat) > 1.1*w:
- M = (L_hat+R_hat)/2.
- if (x0 < M and x1 >= M) or (x0 >= M and x1 < M):
- D = True
- if x1 < M:
- R_hat = M
- else:
- L_hat = M
- if D and (y > self._g(L_hat) and y >= self._g(R_hat)):
- return False
- return True
- def find_interval_doubling(self, x0, y, w, p=None):
- """ Finds an interval around a current point, x0, using a doubling procedure
- (See Radford paper at http://www.cs.toronto.edu/~radford/ftp/slc-samp.pdf)
- Parameters
- ----------
- xo : float
- The current point
- y : float
- The vertical level defining the slice
- w : float
- Estimate of typical slice size
- p : int, optional
- Integer limiting the size of a slice to (2^p)w. If None,
- then interval can grow without bound
- Returns
- -------
- Range containing the interval found
- """
- # Sample initial interval around x0
- U = np.random.random()
- L = x0-w*U
- R = L + w
- # Set limiter
- K = None
- if p:
- K = p
- # Refine interval
- while (y < self._g(L) or y < self._g(R)):
- V = np.random.random()
- if V < 0.5:
- L -= (R-L)
- else:
- R = R + (R-L)
- if p:
- K -= 1
- if K <= 0:
- break
- # Return the inteval
- return Range.closed(L,R)
- def find_interval_step_out(self, x0, y, w, m=None):
- """ Finds an interval around a current point, x0, using the stepping-out
- procedure (See Radford paper at http://www.cs.toronto.edu/~radford/ftp/slc-samp.pdf)
- Parameters
- ----------
- xo : float
- The current point
- y : float
- The vertical level defining the slice
- w : float
- Estimate of typical slice size
- m : int, optional
- Integer, where maximum size of slice should be mw. If None,
- then interval can grow without bound
- Returns
- -------
- Range containing the interval found
- """
- # Sample initial interval around x0
- U = np.random.random()
- L = x0 - w*U
- R = L + w
- # Place limitations on interval if necessary
- V = None
- J = None
- K = None
- if m:
- V = np.random.random()
- J = np.floor(m*V)
- K = (m-1)-J
- # Get left endpoint
- L = _step_left_endpoint(self._g, y, L, J, w, m)
- # Get right endpoint
- R = _step_right_endpoint(self._g, y, R, K, w, m)
- # Return interval
- return Range.closed(L,R)
- def run_sampler(self, x0_start = 0., n_samp = 10000, interval_method='doubling',
- w=0.1, m=None, p=None):
- """ Runs the slice sampler
- Parameters
- ----------
- x0_start : float
- An initial value for x
- n_samp : int
- The number of samples to take
- interval_method : str
- The method for determining the interval at each stage of sampling. Possible values
- are 'doubling', 'stepping'.
- w : float
- Estimate of typical slice size. If adapt_w is true, then this is overriden
- m : int, optional (Only relevant for stepping interval procedure)
- Integer, where maximum size of slice should be mw. If None,
- then interval can grow without bound.
- p : int, optional (Only relevant for doubling interval procedure)
- Integer limiting the size of a slice to (2^p)w. If None,
- then interval can grow without bound
- Returns
- -------
- Generator of samples from the distribution
- Examples
- --------
- >>> from quantgen.stats_utils.slice_sampler import univariate_slice_sampler
- >>> from scipy.stats import norm
- >>> sampler = univariate_slice_sampler(lambda x: norm.logpdf(x, loc=0, scale=5.))
- >>> samples = [x for x in sampler.run_sampler(n_samp=1000, w=2.5)]
- """
- x0 = x0_start
- interval = None
- doubling_used = True
- if interval_method != 'doubling':
- doubling_used = False
- if self._adapt_w:
- w = self._w
- for i in xrange(n_samp):
- # Draw vertical value, y, that defines the horizontal slice
- y = self._g(x0) - np.random.exponential()
- # Find interval around x0 that contains at least a big part of the slice
- if interval_method == 'doubling':
- interval = self.find_interval_doubling(x0,y,w=w,p=p)
- elif interval_method == 'stepping':
- interval = self.find_interval_step_out(x0,y,w=w,m=m)
- else:
- raise ValueError("%s is not an interval method" % interval_method)
- # Draw new point
- x0 = self.sample_by_shrinkage(x0, y, w, interval, doubling_used=doubling_used)
- # Update w if necessary
- if self._adapt_w:
- interval_length = (interval.upperEndpoint()-interval.lowerEndpoint())
- self._w = np.power(self._w,self._w_n/(self._w_n+1.))*
- np.power(interval_length/2.,1./(self._w_n+1.))
- self._w_n += 1.
- w = self._w
- yield float(x0)
- def sample_by_shrinkage(self, x0, y, w, interval, doubling_used = False):
- """ Samples a point from an interval using the shrinkage procedure
- (See Radford paper at http://www.cs.toronto.edu/~radford/ftp/slc-samp.pdf)
- Parameters
- ----------
- x0 : float
- The current point
- y : float
- The vertical level defining the slice
- w : float
- Estimate of typical slice size
- interval : Ranger.Range
- The interval found be the doubling procedure
- doubling_used : boolean
- Whether the doubling procedure was used when defining the interval
- Returns
- -------
- The new point
- """
- # Set up the accept function, based on whether interval was found
- # using doubling
- accept_func = None
- if doubling_used:
- accept_func = lambda x0, x1, y, w, interval:
- self.accept_doubling(x0, x1, y, w, interval)
- else:
- accept_func = lambda x0, x1, y, w, interval: True
- # Set initial interval
- L_bar = interval.lowerEndpoint()
- R_bar = interval.upperEndpoint()
- x1 = L_bar + 0.5*(R_bar-L_bar)
- # Run through shrinkage
- while 1:
- U = np.random.random()
- x1 = L_bar + U*(R_bar-L_bar)
- if y < self._g(x1) and accept_func(x0, x1, y, w, interval):
- break
- if x1 < x0:
- L_bar = x1
- else:
- R_bar = x1
- return x1
- #cython: wraparound=False
- #cython: boundscheck=False
- #cython: cdivision=True
- #cython: nonecheck=False
- from cpython cimport array
- import cython
- import numpy as np
- import ctypes
- cimport numpy as np
- cimport cython
- cimport python_unicode
- from libc.stdlib cimport malloc, free
- from libcpp.vector cimport vector
- cdef extern from "<math.h>":
- cdef double floor(double)
- cdef double log(double)
- cdef double pow(double, double)
- cdef double tgamma(double)
- cdef extern from "stdint.h":
- ctypedef unsigned long long uint64_t
- cdef extern from "gsl/gsl_rng.h":#nogil:
- ctypedef struct gsl_rng_type:
- pass
- ctypedef struct gsl_rng:
- pass
- gsl_rng_type *gsl_rng_mt19937
- gsl_rng *gsl_rng_alloc(gsl_rng_type * T)
- cdef gsl_rng *r = gsl_rng_alloc(gsl_rng_mt19937)
- cdef extern from "gsl/gsl_randist.h" nogil:
- double unif "gsl_rng_uniform"(gsl_rng * r)
- double unif_interval "gsl_ran_flat"(gsl_rng * r,double,double) ## syntax; (seed, lower, upper)
- double exponential "gsl_ran_exponential"(gsl_rng * r,double) ## syntax; (seed, mean) ... mean is 1/rate
- ctypedef double (*func_t)(double)
- cdef class wrapper:
- cdef func_t wrapped
- def __call__(self, value):
- return self.wrapped(value)
- def __unsafe_set(self, ptr):
- self.wrapped = <func_t><void *><size_t>ptr
- cdef double* stepping_out(double x0, double y, double w, int m, func_t f):
- """
- Function for finding an interval around the current point
- using the "stepping out" procedure (Figure 3, Neal (2003))
- Parameters of stepping_out subroutine:
- Input:
- x0 ------------ the current point
- y ------------ logarithm of the vertical level defining the slice
- w ------------ estimate of the typical size of a slice
- m ------------ integer limiting the size of a slice to "m*w"
- (*func_t) ------------ routine to compute g(x) = log(f(x))
- Output:
- interv[2] ------------ the left and right sides of found interval
- """
- cdef double *interv = <double *>malloc(2 * cython.sizeof(double))
- if interv is NULL:
- raise MemoryError()
- cdef double u
- cdef int J, K
- cdef double g_interv[2]
- #Initial guess for the interval
- u = unif_interval(r,0,1)
- interv[0] = x0 - w*u
- interv[1] = interv[0] + w
- #Get numbers of steps tried to left and to right
- if m>0:
- u = unif_interval(r,0,1)
- J = <uint64_t>floor(m*u)
- K = (m-1)-J
- #Initial evaluation of g in the left and right limits of the interval
- g_interv[0]=f(interv[0])
- g_interv[1]=f(interv[1])
- #Step to left until leaving the slice
- while (g_interv[0] >= y):
- interv[0] -= w
- g_interv[0]=f(interv[0])
- if m>0:
- J-=1
- if (J<= 0):
- break
- #Step to right until leaving the slice */
- while (g_interv[1] > y):
- interv[1] += w
- g_interv[1]=f(interv[1])
- if m>0:
- K-=1
- if (K<= 0):
- break
- #http://cython.readthedocs.io/en/latest/src/tutorial/memory_allocation.html
- try:
- return interv
- finally:
- # return the previously allocated memory to the system
- free(interv)
- cdef double* doubling(double x0, double y, double w, int p, func_t f):
- """
- Function for finding an interval around the current point
- using the "doubling" procedure (Figure 4, Neal (2003))
- Input:
- x0 ------------ the current point
- y ------------ logarithm of the vertical level defining the slice
- w ------------ estimate of the typical size of a slice
- p ------------ integer limiting the size of a slice to "2^p*w"
- (*func_t) ------------ routine to compute g(x) = log(f(x))
- Output:
- interv[2] ------------ the left and right sides of found interval
- """
- cdef double* interv = <double *>malloc(2 * cython.sizeof(double))
- if interv is NULL:
- raise MemoryError()
- cdef double u
- cdef int K
- cdef bint now_left
- cdef double g_interv[2]
- #Initial guess for the interval
- u = unif_interval(r,0,1)
- interv[0] = x0 - w*u
- interv[1] = interv[0] + w
- if p>0:
- K = p
- # Initial evaluation of g in the left and right limits of the interval
- g_interv[0]= f(interv[0])
- g_interv[1]= f(interv[1])
- # Perform doubling until both ends are outside the slice
- while ((g_interv[0] > y) or (g_interv[1] > y)):
- u = unif_interval(r,0,1)
- now_left = (u < 0.5)
- if (now_left):
- interv[0] -= (interv[1] - interv[0])
- g_interv[0]=f(interv[0])
- else:
- interv[1] += (interv[1] - interv[0])
- g_interv[1]=f(interv[1])
- if p>0:
- K-=1
- if (K<=0):
- break
- try:
- return interv
- finally:
- # return the previously allocated memory to the system
- free(interv)
- cdef bint accept_doubling(double x0, double x1, double y, double w, np.ndarray[ndim=1, dtype=np.float64_t] interv, func_t f):
- """
- Acceptance test of newly sampled point when the "doubling" procedure has been used to find an
- interval to sample from (Figure 6, Neal (2003))
- Parameters
- Input:
- x0 ------------ the current point
- x1 ------------- the possible next candidate point
- y ------------ logarithm of the vertical level defining the slice
- w ------------ estimate of the typical size of a slice
- interv[2] ------------ the left and right sides of found interval
- (*func_t) ------------ routine to compute g(x) = log(f(x))
- Output:
- accept ------------ True/False indicating whether the point is acceptable or not
- """
- cdef double interv1[2]
- cdef double g_interv1[2]
- cdef bint D
- cdef double w11, mid
- w11 = 1.1*w
- interv1[0] = interv[0]
- interv1[1] = interv[1]
- D = False
- while ( (interv1[1] - interv1[0]) > w11):
- mid = 0.5*(interv1[0] + interv1[1])
- if ((x0 < mid) and (x1 >= mid)) or ((x0 >= mid) and (x1 < mid)):
- D = True
- if (x1 < mid):
- interv1[1] = mid
- g_interv1[1] = f(interv1[1])
- else:
- interv1[0] = mid
- g_interv1[0] = f(interv1[0])
- if (D and (g_interv1[0] < y) and (g_interv1[1] <= y)):
- return False
- return True
- cdef double shrinkage(double x0, double y, double w, np.ndarray[ndim=1, dtype=np.float64_t] interv, bint doubling, func_t f):
- """
- Function to sample a point from the interval while skrinking the interval when the sampled point is
- not acceptable (Figure 5, Neal (2003))
- Input:
- x0 ------------ the current point
- y ------------ logarithm of the vertical level defining the slice
- w ------------ estimate of the typical size of a slice
- interv[2] ------------ the left and right sides of found interval
- (*func_t) ------------ routine to compute g(x) = log(f(x))
- doubling ------------ 0/1 indicating whether doubling was used to find an interval
- Output:
- x1 ------------- newly sampled point
- """
- cdef double u, gx1, x1
- cdef bint accept
- cdef double L_bar, R_bar
- L_bar=interv[0]
- R_bar=interv[1]
- x1 = L_bar + 0.5*(R_bar - L_bar)
- while True:
- u = unif_interval(r,0,1)
- x1 = L_bar + u*(R_bar - L_bar)
- gx1=f(x1)
- if (doubling):
- accept=accept_doubling(x0, x1, y, w, interv, f )
- if ((gx1 > y) and accept):
- break
- if (x1 < x0):
- L_bar = x1
- else:
- R_bar = x1
- else:
- if (gx1 > y):
- break
- if (x1 < x0):
- L_bar = x1
- else:
- R_bar = x1
- return x1
- cdef double log_beta(double x):
- #Log of beta distribution with second argument b=1
- cdef double a=5.
- return log(a)+(a-1.)*log(x)
- cdef wrapper make_wrapper(func_t f):
- cdef wrapper W=wrapper()
- W.wrapped=f
- return W
- def slice_sampler(int n_sample,
- wrapper f,
- int m = 0,
- int p = 0,
- double x0_start=0.0,
- bint adapt_w=False,
- double w_start=0.1,
- char* interval_method ='doubling'):
- """
- Inputs:
- n_sample ------------ Number of sample points from the given distribution
- f ------------ A log of the function you want to sample and accepts a scalar as an argument (the x)
- x0_start ------------ An initial value for x
- adapt_w ------------ Whether to adapt w during sampling. Will work in between samplings
- w_start ------------ Starting value for w, necessary if adapting w during sampling
- p ------------ Integer limiting the size of a slice to (2^p)w. If None, then interval can grow without bound
- m ------------ Integer, where maximum size of slice should be mw. If None, then interval can grow without bound
- interval_method ------------ The method for determining the interval at each stage of sampling. Possible values are 'doubling', 'stepping'.
- """
- cdef unicode s= interval_method.decode('UTF-8', 'strict')
- cdef double x0 = x0_start
- cdef double vertical, w, expon
- cdef double interval_length
- cdef bint doubling_used=True
- if (s!=u'doubling'):
- doubling_used=False
- cdef double w_n=1.
- cdef vector[double] samples #http://cython.readthedocs.io/en/latest/src/userguide/wrapping_CPlusPlus.html
- w=w_start
- cdef Py_ssize_t i
- cdef np.ndarray[ndim=1, dtype=np.float64_t] interv
- cdef np.float64_t[:] view
- for 0<= i <n_sample:
- expon = exponential(r, 1)
- vertical = f.wrapped(x0) - expon
- print x0,f.wrapped(x0),vertical
- if (s=='doubling'):
- view=<np.float64_t[:2]>doubling(x0, vertical, w, p, f.wrapped)
- interv= np.asarray(view)
- print "after:", interv
- elif (s==u'stepping'):
- view=<np.float64_t[:2]>stepping_out(x0, vertical, w, m, f.wrapped)
- interv= np.asarray(view)
- else:
- raise ValueError("%s is not an acceptable interval method for slice sampler"%s )
- print "finish expanding the range.."
- x0=shrinkage(x0, vertical, w, interv, doubling_used, f.wrapped)
- samples.push_back(x0)
- if adapt_w:
- interval_length=interv[1]-interv[0]
- w=pow(w,w_n/(w_n+1))*pow(interval_length/2.,1./(w_n+1.))
- w_n+=1.
- print x0
- return samples
- def run(int n_sample,
- double x0_start=0.01,
- double w_start=2.5):
- wrap_f=make_wrapper(log_beta)
- return slice_sampler(n_sample, wrap_f,x0_start=x0_start, w_start=w_start)
- from distutils.core import setup
- from distutils.extension import Extension
- import numpy
- from Cython.Distutils import build_ext
- extra_compile_args = ['-std=c++11']
- extra_link_args = ['-Wall']
- setup(
- cmdclass = {'build_ext': build_ext},
- ext_modules=[
- Extension("SliceSampler",
- sources=["SliceSampler.pyx"],
- language="c++",
- libraries=["stdc++","gsl", "gslcblas"],
- include_dirs=[numpy.get_include()],
- extra_compile_args=extra_compile_args,
- extra_link_args=extra_link_args)
- ],
- gdb_debug=True)
- import numpy as np
- import pylab as plt
- from SliceSampler import *
- x=run(10000)
- n, bins, patches = plt.hist(x, 50, normed=1, histtype='step', lw=2, color='r', label="Beta")
- plt.show()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement