function for censored distrbutions
# Cython wrapper for the fast-dm code by Voss & Voss
# (C) by Thomas Wiecki ([email protected]), 2010
# GPLv2 or later.
from __future__ import division
import numpy as np
cimport numpy as np
cimport cython
# Define data type
DTYPE = np.double
ctypedef np.double_t DTYPE_t
ctypedef np.int_t DTYPE_int
# Define external functions of fast-dm
cdef extern from "fast-dm.h":
cdef struct F_calculator:
void F_start (F_calculator*, int)
int F_get_N (F_calculator*)
double F_get_z (F_calculator*, int)
double F_get_val (F_calculator*, double, double)
void F_delete (F_calculator*)
cdef F_calculator* F_new(double*)
void set_precision (double)
@cython.boundscheck(False) # turn of bounds-checking for entire function
def cdf(double v, double V, double a, double z, double Z, double t, double T, double precision=3., int N=500, double time=2, np.ndarray[DTYPE_t, ndim=1] output=None):
"""Compute the CDF of the drift diffusion model using the method
and implementation of fast-dm by Voss&Voss.
cdef F_calculator *fc
cdef double dt = time/N
cdef np.ndarray[DTYPE_t, ndim=1] params = np.empty(6, dtype=np.double)
if output is None:
output = np.empty((2*N+1), dtype='float')
assert len(output) == 2*N+1, "output array must have length 2*N+1"
params[0] = a; params[1]=v; params[2]=t; params[3]=a*Z;
params[4] = V; params[5]=T;
fc = F_new(<double *>
F_start (fc, 1)
for i from 0 <= i <= N:
output[N+i] = F_get_val(fc, i*dt, a*z)
F_start (fc, 0)
for i from 1 <= i <= N:
output[N-i] = F_get_val(fc, i*dt, a*z)
F_delete (fc)
return output
@cython.boundscheck(False) # turn of bounds-checking for entire function
def censored_like(int l_samples, double l_bound, int u_samples, double u_bound, double v,
double V, double a, double z, double Z, double t, double T, double precision=3.):
likelihood of censored values.
the function return the log-likelihood of getting l_samples smaller then l_bound,
and u_samples larger the u_bound.
P(x < thresh) = F_positive(tresh) - F_negative(thresh)
P(x > thresh) = 1 - P(x < thresh)
cdef F_calculator *fc
cdef double l_cdf = 0 #log likelihod of F_pos(x < l_bound) - F_neg(x < l_bound)
cdef double u_cdf = 0
cdef double logp
cdef double params[6]
params[0] = a; params[1]=v; params[2]=t; params[3]=a*Z;
params[4] = V; params[5]=T;
fc = F_new(params)
#get upper boundary cdf
F_start (fc, 1)
if l_samples > 0:
l_cdf = F_get_val(fc, l_bound, a*z)
if u_samples > 0:
u_cdf = F_get_val(fc, u_bound, a*z)
#get full cdf using the lower boundary cdf
F_start (fc, 0)
if l_samples > 0:
l_cdf -= F_get_val(fc, l_bound, a*z)
l_cdf = np.log(l_cdf)
if u_samples > 0:
u_cdf -= F_get_val(fc, u_bound, a*z)
u_cdf = np.log(1 - u_cdf)
logp = (l_samples * l_cdf) + (u_samples * u_cdf)
logp = (l_samples * l_cdf) + (u_samples * u_cdf)
    return logp
def censored_like(l_samples, l_bound, u_samples, u_bound, v, V, a, z, Z, t, T, precision=3.):
return fastdm_cdf.censored_like(l_samples, l_bound, u_samples, u_bound, v, V, a, z, Z, t, T, precision)
class TestCensoredLike():
def cdf_by_integration(self, v, V, a, z, Z, t,T, cdf_lb=-5, cdf_ub=5, dt=0.001):
"""create a cdf array using numerical integration over the pdf"""
# init
x = np.arange(cdf_lb, cdf_ub, dt)
l_cdf = np.empty(x.shape[0], dtype=np.double)
l_cdf[0] = 0
# get pdf values
for i in range(x.shape[0]):
pdf = hddm.wfpt.full_pdf(x[i], v, V, a, z, Z, t, T, 1e-4)
l_cdf[i] = l_cdf[i-1] + pdf
# normalize
l_cdf /= l_cdf[x.shape[0]-1]
return l_cdf, x
def eval_censored_like_on_set_of_rts(self, rts, v, V, a, z, Z, t, T):
"""evalutate censored_like on set of rts with a set of given parameters"""
#cdf by integration
i_t = time()
l_cdf, x_axis = self.cdf_by_integration(v=v, V=V, a=a, z=z, Z=Z, t=t, T=T, cdf_lb=-7, cdf_ub=7, dt=1e-4)
integ_time = time() - i_t
params_holder = np.zeros(6);
for rt in rts:
for i_cond in range(2):
#get censored_like using the cdf method
i_xpos = np.searchsorted(x_axis, rt)
i_xneg = np.searchsorted(x_axis, -rt)
p_a = l_cdf[i_xpos] - l_cdf[i_xneg]
if i_cond == 1:
p_a = 1 - p_a
#get it using the censored_like method
i_t = time()
if i_cond == 0:
logp_b = hddm.likelihoods.censored_like(1, rt, 0, 1,
v, V, a, z, Z, t, T,
logp_b = hddm.likelihoods.censored_like(0, 0, 1, rt,
v, V, a, z, Z, t, T,
dm_time = time() - i_t
p_b = np.exp(logp_b)
#print results
print "*****"
print "rt: .%3f" % rt
print "fastdm: %.4f (%.3f secs)" % (p_b, dm_time)
print "integration: %.4f (%.3f secs)" % (p_a, integ_time)
np.testing.assert_almost_equal(p_a, p_b, 2)
def run_censored_like_single_include(self, include, large_V = True):
"""run a single censored_like test by generating a set of random params.
random params are generated using the include argument.
if large_V is True then V is much can get larger values than usual.
if len(include) == 0:
print "@@@@@@@@@@@ simple Test @@@@@@@@@@@"
print "@@@@@@@@@@@ %s Test @@@@@@@@@@@" % include
for i in range(10):
params = hddm.generate.gen_rand_params(include = include)
if large_V:
params['V'] *= 5
print "*** test %d ***" % i
print params
print "!!!!!!!!!!!!!!!!"
self.eval_censored_like_on_set_of_rts(np.arange(0, 3,0.3), **params)
def test_censored_like(self):
run censored_like test
self.run_censored_like_single_include(['V'], large_V = True)
self.run_censored_like_single_include(['T', 'Z'])
self.run_censored_like_single_include(['Z', 'V'])
self.run_censored_like_single_include(['Z', 'V', 'T'])
