Last active
December 3, 2015 10:03
-
-
Save robinvanemden/58a152f8b71eb7dac19d to your computer and use it in GitHub Desktop.
Testing sensitivity of LiF to missing data and permutations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
## IPython Reset | |
#from IPython import get_ipython | |
#get_ipython().magic('reset -sf') | |
############################################################################## | |
import numpy as np | |
import matplotlib.pyplot as plt | |
############################################################################## | |
def matrixpush(m, row): | |
if not np.all(np.isfinite(values[:,0])): | |
i = np.count_nonzero(np.logical_not(np.isnan(values[:,0]))) | |
m[i,] = row | |
else: | |
m = np.vstack([m,row]) | |
m = m[1:,] | |
return(m) | |
def getobs( x, max = 5, err=0 ): | |
if (err==0): | |
obsr = -1*pow((x-max),2) | |
else: | |
obsr = -1*pow((x-max),2) + np.random.normal(0,err,1) | |
return obsr; | |
############################################################################## | |
stream = 3000 # Length of stream | |
inttime = 100 # Integration time | |
amplitude = 1.4 # Amlitude LIF | |
learnrate = .004 # Learnrate | |
omega = (2*np.pi)/inttime # Omega | |
x0 = 1.0 # Startvalue | |
############################################################################## | |
p_return = 0.80 # Chance that returns | |
variance = 1 # variance in observations | |
delay_buffer_length = 101 # length of delay and randomizer buffer | |
############################################################################## | |
use_mean = 1 # mean LiF or basic LiF | |
drop_values = 1 # drop values, or not | |
do_random = 1 # return random, or not | |
super_random = 1 # do "realistic" exponential or "real" rnd | |
lif_one = 0 # use batchwise LiF-1, or continuous LiF-2 | |
############################################################################## | |
values = np.zeros((inttime,3)) | |
values.fill(np.nan) | |
delay_buffer = np.empty((0,3), float) | |
track_x0 = [] | |
track_x = [] | |
track_t = [] | |
track_y = [] | |
x = 0.0 | |
t = 0.0 | |
y = 0.0 | |
############################################################################## | |
for i in range(0,stream+delay_buffer_length): | |
# if LiF-2 and buffer full, keep on calculating x0 | |
# if LiF-1 and buffer full, calculate x0 and empty buffer | |
if np.all(np.isfinite(values[:,0])): | |
if use_mean: | |
x0 = np.mean(values[:,1]) | |
x0 = x0 + learnrate * sum( values[:,2] ) | |
else: | |
if lif_one: | |
x0 = x0 + learnrate * sum( values[:,2] ) | |
else: | |
x0 = x0 + learnrate * sum( values[:,2] ) / inttime | |
if lif_one: values.fill(np.nan) | |
# calculate t,x,y | |
t = i | |
x = x0 + amplitude*np.cos(omega * t) | |
y = amplitude*np.cos(omega * t)*getobs(x,5,variance) | |
# log vars | |
track_t = np.append(track_t, t) | |
track_x = np.append(track_x, x) | |
track_y = np.append(track_y, y) | |
track_x0 = np.append(track_x0, x0) | |
row_to_add = np.array([t,x,y]) | |
# return all responses | |
if not drop_values and not do_random: | |
row_to_add = np.array([t,x,y]) | |
y = amplitude*np.cos(omega * t)*getobs(x,5,variance) | |
values = matrixpush(values, row_to_add) | |
# drop some responses | |
if np.random.binomial(1, p_return, 1)==1 and drop_values and not do_random: | |
values = matrixpush(values, row_to_add) | |
# drop some responses, return rest randomized | |
if np.random.binomial(1, p_return, 1)==1 and drop_values and do_random: | |
# after a delay, so fill buffer | |
delay_buffer = np.vstack([row_to_add,delay_buffer]) | |
# this buffer enables to return random | |
if len(delay_buffer)>=delay_buffer_length: | |
# how random do you want it? exponential propably most realistic | |
if super_random: | |
#completely random | |
random_row_nr = np.random.randint(0,len(delay_buffer)-1) | |
else: | |
# use exponential randomizer, ie greater prob recent val | |
random_row_nr = int(3*np.random.standard_exponential(1)) | |
# if number higher than buffer, pick random within buffer | |
if random_row_nr>=len(delay_buffer): | |
random_row_nr=np.random.randint(0,len(delay_buffer)-1) | |
# copy row to a var, and delete row from delay buffer | |
random_row = delay_buffer[random_row_nr,] | |
delay_buffer = np.delete(delay_buffer, random_row_nr, axis=0) | |
# push the randomly picked row to values | |
values = matrixpush(values, random_row) | |
############################################################################## | |
# plot some vars | |
plt.plot(track_x) | |
plt.show() | |
plt.plot(track_x0) | |
plt.show() | |
# print final x0 | |
print(x0) | |
############################################################################## | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment