-
-
Save twiecki/2021613 to your computer and use it in GitHub Desktop.
function for censored distrbutions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Cython wrapper for the fast-dm code by Voss & Voss | |
# (C) by Thomas Wiecki ([email protected]), 2010 | |
# GPLv2 or later. | |
from __future__ import division | |
import numpy as np | |
cimport numpy as np | |
cimport cython | |
# Define data type | |
DTYPE = np.double | |
ctypedef np.double_t DTYPE_t | |
ctypedef np.int_t DTYPE_int | |
# Define external functions of fast-dm | |
cdef extern from "fast-dm.h": | |
cdef struct F_calculator: | |
pass | |
void F_start (F_calculator*, int) | |
int F_get_N (F_calculator*) | |
double F_get_z (F_calculator*, int) | |
double F_get_val (F_calculator*, double, double) | |
void F_delete (F_calculator*) | |
cdef F_calculator* F_new(double*) | |
void set_precision (double) | |
@cython.boundscheck(False) # turn of bounds-checking for entire function | |
def cdf(double v, double V, double a, double z, double Z, double t, double T, double precision=3., int N=500, double time=2, np.ndarray[DTYPE_t, ndim=1] output=None): | |
"""Compute the CDF of the drift diffusion model using the method | |
and implementation of fast-dm by Voss&Voss. | |
""" | |
cdef F_calculator *fc | |
cdef double dt = time/N | |
cdef np.ndarray[DTYPE_t, ndim=1] params = np.empty(6, dtype=np.double) | |
if output is None: | |
output = np.empty((2*N+1), dtype='float') | |
else: | |
assert len(output) == 2*N+1, "output array must have length 2*N+1" | |
set_precision(precision) | |
params[0] = a; params[1]=v; params[2]=t; params[3]=a*Z; | |
params[4] = V; params[5]=T; | |
fc = F_new(<double *> params.data) | |
F_start (fc, 1) | |
for i from 0 <= i <= N: | |
output[N+i] = F_get_val(fc, i*dt, a*z) | |
F_start (fc, 0) | |
for i from 1 <= i <= N: | |
output[N-i] = F_get_val(fc, i*dt, a*z) | |
F_delete (fc) | |
return output | |
@cython.boundscheck(False) # turn of bounds-checking for entire function | |
def censored_like(int l_samples, double l_bound, int u_samples, double u_bound, double v, | |
double V, double a, double z, double Z, double t, double T, double precision=3.): | |
""" | |
likelihood of censored values. | |
the function return the log-likelihood of getting l_samples smaller then l_bound, | |
and u_samples larger the u_bound. | |
P(x < thresh) = F_positive(tresh) - F_negative(thresh) | |
P(x > thresh) = 1 - P(x < thresh) | |
""" | |
cdef F_calculator *fc | |
cdef double l_cdf = 0 #log likelihod of F_pos(x < l_bound) - F_neg(x < l_bound) | |
cdef double u_cdf = 0 | |
cdef double logp | |
cdef double params[6] | |
#init | |
set_precision(precision) | |
params[0] = a; params[1]=v; params[2]=t; params[3]=a*Z; | |
params[4] = V; params[5]=T; | |
fc = F_new(params) | |
#get upper boundary cdf | |
F_start (fc, 1) | |
if l_samples > 0: | |
l_cdf = F_get_val(fc, l_bound, a*z) | |
if u_samples > 0: | |
u_cdf = F_get_val(fc, u_bound, a*z) | |
#get full cdf using the lower boundary cdf | |
F_start (fc, 0) | |
if l_samples > 0: | |
l_cdf -= F_get_val(fc, l_bound, a*z) | |
l_cdf = np.log(l_cdf) | |
if u_samples > 0: | |
u_cdf -= F_get_val(fc, u_bound, a*z) | |
u_cdf = np.log(1 - u_cdf) | |
logp = (l_samples * l_cdf) + (u_samples * u_cdf) | |
return logp--(09:19:58)--(imri-mbp:~/Documents/third/HDDM -(censored)$ | |
>> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def censored_like(l_samples, l_bound, u_samples, u_bound, v, V, a, z, Z, t, T, precision=3.): | |
return fastdm_cdf.censored_like(l_samples, l_bound, u_samples, u_bound, v, V, a, z, Z, t, T, precision) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TestCensoredLike(): | |
def cdf_by_integration(self, v, V, a, z, Z, t,T, cdf_lb=-5, cdf_ub=5, dt=0.001): | |
"""create a cdf array using numerical integration over the pdf""" | |
# init | |
x = np.arange(cdf_lb, cdf_ub, dt) | |
l_cdf = np.empty(x.shape[0], dtype=np.double) | |
l_cdf[0] = 0 | |
# get pdf values | |
for i in range(x.shape[0]): | |
pdf = hddm.wfpt.full_pdf(x[i], v, V, a, z, Z, t, T, 1e-4) | |
l_cdf[i] = l_cdf[i-1] + pdf | |
# normalize | |
l_cdf /= l_cdf[x.shape[0]-1] | |
return l_cdf, x | |
def eval_censored_like_on_set_of_rts(self, rts, v, V, a, z, Z, t, T): | |
"""evalutate censored_like on set of rts with a set of given parameters""" | |
#cdf by integration | |
i_t = time() | |
l_cdf, x_axis = self.cdf_by_integration(v=v, V=V, a=a, z=z, Z=Z, t=t, T=T, cdf_lb=-7, cdf_ub=7, dt=1e-4) | |
integ_time = time() - i_t | |
params_holder = np.zeros(6); | |
for rt in rts: | |
for i_cond in range(2): | |
#get censored_like using the cdf method | |
i_xpos = np.searchsorted(x_axis, rt) | |
i_xneg = np.searchsorted(x_axis, -rt) | |
p_a = l_cdf[i_xpos] - l_cdf[i_xneg] | |
if i_cond == 1: | |
p_a = 1 - p_a | |
#get it using the censored_like method | |
i_t = time() | |
if i_cond == 0: | |
logp_b = hddm.likelihoods.censored_like(1, rt, 0, 1, | |
v, V, a, z, Z, t, T, | |
precision=3.) | |
else: | |
logp_b = hddm.likelihoods.censored_like(0, 0, 1, rt, | |
v, V, a, z, Z, t, T, | |
precision=3.) | |
dm_time = time() - i_t | |
p_b = np.exp(logp_b) | |
#print results | |
print "*****" | |
print "rt: .%3f" % rt | |
print "fastdm: %.4f (%.3f secs)" % (p_b, dm_time) | |
print "integration: %.4f (%.3f secs)" % (p_a, integ_time) | |
np.testing.assert_almost_equal(p_a, p_b, 2) | |
def run_censored_like_single_include(self, include, large_V = True): | |
"""run a single censored_like test by generating a set of random params. | |
random params are generated using the include argument. | |
if large_V is True then V is much can get larger values than usual. | |
""" | |
if len(include) == 0: | |
print "@@@@@@@@@@@ simple Test @@@@@@@@@@@" | |
else: | |
print "@@@@@@@@@@@ %s Test @@@@@@@@@@@" % include | |
for i in range(10): | |
params = hddm.generate.gen_rand_params(include = include) | |
if large_V: | |
params['V'] *= 5 | |
print "*** test %d ***" % i | |
print params | |
print "!!!!!!!!!!!!!!!!" | |
self.eval_censored_like_on_set_of_rts(np.arange(0, 3,0.3), **params) | |
def test_censored_like(self): | |
""" | |
run censored_like test | |
""" | |
np.random.seed(1) | |
self.run_censored_like_single_include(()) | |
self.run_censored_like_single_include(['T']) | |
self.run_censored_like_single_include(['V']) | |
self.run_censored_like_single_include(['V'], large_V = True) | |
self.run_censored_like_single_include(['Z']) | |
self.run_censored_like_single_include(['T','V']) | |
self.run_censored_like_single_include(['T', 'Z']) | |
self.run_censored_like_single_include(['Z', 'V']) | |
self.run_censored_like_single_include(['Z', 'V', 'T']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment