Created
October 22, 2021 06:30
-
-
Save russelljjarvis/3a41ea0760c06ae162e7d57aa523d3af to your computer and use it in GitHub Desktop.
uses pyspikes spike distance metric to optimize BluePyOpt+NeuronUnit fille is here https://github.com/russelljjarvis/neuronunit/blob/main/neuronunit/allenapi/allen_data_driven.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Dict, List, Optional, Tuple, Type, Union, Text | |
import pickle | |
import seaborn as sns | |
import os | |
import copy | |
import numpy as np | |
from collections.abc import Iterable | |
import pandas as pd | |
import quantities as pq | |
ALLEN_DURATION = 2000 * pq.ms | |
ALLEN_DELAY = 1000 * pq.ms | |
import bluepyopt as bpop | |
import bluepyopt.ephys as ephys | |
from bluepyopt.parameters import Parameter | |
from sciunit.scores import RelativeDifferenceScore | |
from sciunit import TestSuite | |
from sciunit.scores import ZScore | |
from sciunit.scores.collections import ScoreArray | |
from neuronunit.allenapi import make_allen_tests_from_id | |
from neuronunit.allenapi.make_allen_tests_from_id import * | |
from neuronunit.allenapi.make_allen_tests import AllenTest | |
import streamlit as st | |
import matplotlib.pyplot as plt | |
from neuronunit.optimization.optimization_management import inject_model_soma | |
from neuronunit.optimization.model_parameters import BPO_PARAMS | |
from neuronunit.tests import ( | |
RheobaseTest, | |
CapacitanceTest, | |
RestingPotentialTest, | |
InputResistanceTest, | |
TimeConstantTest, | |
FITest, | |
) | |
import pyspike as spk | |
ALLEN_DURATION = 2000 * pq.ms | |
ALLEN_DELAY = 1000 * pq.ms | |
def opt_setup( | |
specimen_id, | |
model_type, | |
target_num, | |
template_model=None, | |
cached=False, | |
fixed_current=False, | |
score_type=ZScore, | |
efel_filter_iterable=None, | |
): | |
if False: | |
#if cached: | |
with open(str(specimen_id) + "later_allen_NU_tests.p", "rb") as f: | |
suite = pickle.load(f) | |
else: | |
sweep_numbers, data_set, sweeps = make_allen_tests_from_id.allen_id_to_sweeps( | |
specimen_id | |
) | |
( | |
vmm, | |
stimulus, | |
sn, | |
spike_times, | |
) = make_allen_tests_from_id.get_model_parts_sweep_from_spk_cnt( | |
target_num, data_set, sweep_numbers, specimen_id | |
) | |
#t = vmm.times[-1] | |
t = ALLEN_DURATION+ALLEN_DELAY | |
target_model_spkt = spk.SpikeTrain(spike_times, edges=(0,t), is_sorted=True) | |
#if type(efel_filter_iterable) is type(list()): | |
# efel_filter_iterable.append({"tspkt":target_model_spkt}) | |
#else: | |
# efel_filter_iterable["tspkt"]=target_model_spkt | |
#print(efel_filter_iterable) | |
( | |
suite, | |
specimen_id, | |
) = make_allen_tests_from_id.make_suite_known_sweep_from_static_models( | |
vmm, stimulus, specimen_id, efel_filter_iterable=efel_filter_iterable | |
) | |
with open(str(specimen_id) + "later_allen_NU_tests.p", "wb") as f: | |
pickle.dump(suite, f) | |
if "vm_soma" in suite.traces.keys(): | |
target = StaticModel(vm=suite.traces["vm_soma"]) | |
target.vm_soma = suite.traces["vm_soma"] | |
target.spikes = target_model_spkt | |
nu_tests = suite.tests | |
attrs = {k: np.mean(v) for k, v in MODEL_PARAMS[model_type].items()} | |
for t in nu_tests: | |
if t.name == "Spikecount": | |
spk_count = float(t.observation["mean"]) | |
break | |
observation_range = {} | |
observation_range["value"] = spk_count | |
template_model.backend = model_type | |
template_model.allen = True | |
template_model.NU = True | |
if fixed_current: | |
uc = { | |
"amplitude": fixed_current, | |
"duration": ALLEN_DURATION, | |
"delay": ALLEN_DELAY, | |
} | |
target_current = None | |
else: | |
scs = SpikeCountSearch(observation_range) | |
assert template_model is not None | |
target_current = scs.generate_prediction(template_model) | |
template_model.seeded_current = target_current["value"] | |
return template_model, suite, nu_tests, target_current, spk_count, target_model_spkt | |
def wrap_setups( | |
specimen_id, | |
model_type, | |
target_num_spikes, | |
template_model=None, | |
fixed_current=False, | |
cached=False, | |
score_type=ZScore, | |
efel_filter_iterable=None, | |
): | |
#template_model.tspkt = target_spike_times | |
#print(template_model.tspkt) | |
#import pdb | |
#pdb.set_trace() | |
assert template_model is not None | |
[ template_model, suite, nu_tests, target_current, spk_count,tspkt ] = opt_setup( | |
specimen_id, | |
model_type, | |
target_num_spikes, | |
template_model=template_model, | |
fixed_current=False, | |
cached=None, | |
score_type=score_type, | |
efel_filter_iterable=efel_filter_iterable, | |
) | |
target_vm_soma = suite.traces["vm_soma"] | |
#if target_vm_soma is not None: | |
# tspk = threshold_detection(target.vm_soma) | |
# tspkt = spk.SpikeTrain(tspk, edges=(0,t), is_sorted=True) | |
template_model.seeded_current = target_current["value"] | |
template_model.allen = True | |
template_model.NU = True | |
template_model.backend = model_type | |
#efel_filter_iterable['tspkt'] = target_model_spkt | |
template_model.efel_filter_iterable = efel_filter_iterable | |
#template_model.cell_model.params = BPO_PARAMS[model_type] | |
#template_model.tspkt = tspkt | |
#print(template_model.tspkt) | |
cell_evaluator, template_model = opt_setup_two( | |
model_type, | |
suite, | |
nu_tests, | |
target_current, | |
spk_count, | |
template_model=template_model, | |
score_type=score_type, | |
efel_filter_iterable=efel_filter_iterable, | |
tspkt = tspkt | |
) | |
assert cell_evaluator.cell_model is not None | |
cell_evaluator.suite = None | |
cell_evaluator.suite = suite | |
cell_evaluator.cell_model.tspkt = None | |
cell_evaluator.cell_model.tspkt = tspkt | |
return cell_evaluator, template_model, suite, target_current, spk_count | |
class NUFeatureAllenMultiSpike(object): | |
def __init__( | |
self, test, model, cnt, target, spike_obs, print_stuff=False, score_type=None, tspkt = None | |
): | |
self.test = test | |
self.model = model | |
self.spike_obs = spike_obs | |
self.cnt = cnt | |
self.target = target | |
self.score_type = score_type | |
self.score_array = None | |
self.difference_in_spike_counts = 0 | |
self.tspkt = tspkt | |
def calculate_score(self, responses): | |
if not "features" in responses.keys(): | |
return 1000.0 | |
features = responses["features"] | |
if features is None: | |
return 1000.0 | |
self.test.score_type = self.score_type | |
feature_name = self.test.name | |
if feature_name not in features.keys(): | |
return 1000.0 | |
if features[feature_name] is None: | |
return 1000.0 | |
if feature_name == "fresh_model_spkt": | |
spike_trains = [self.tspkt, features[feature_name]] | |
f = spk.isi_profile(spike_trains) | |
t=ALLEN_DURATION+ALLEN_DELAY | |
delta = f(t)*10 | |
if feature_name == "fresh_model_spkt_1": | |
spike_trains = [self.tspkt, features[feature_name]] | |
f = spk.isi_profile(spike_trains) | |
t=ALLEN_DURATION+ALLEN_DELAY | |
delta = f(t)*10 | |
if feature_name == "fresh_model_spkt_2": | |
spike_trains = [self.tspkt, features[feature_name]] | |
f = spk.spike_profile(spike_trains) | |
t=ALLEN_DURATION+ALLEN_DELAY | |
delta = f(t)*10 | |
if feature_name == "fresh_model_spkt_3": | |
spike_trains = [self.tspkt, features[feature_name]] | |
f = spk.spike_profile(spike_trains) | |
t=ALLEN_DURATION+ALLEN_DELAY | |
delta = f(t)*10 | |
#t = self.target.vm_used.times[-1] | |
#fresh_model_spkt = spk.SpikeTrain(spikes, edges=(0,t), is_sorted=True) | |
#print(fresh_model_spkt) | |
#tspk = threshold_detection(target_vm) | |
#dist1 = spk.SpikeTrain(tspk, edges=(0,t), is_sorted=True) | |
#if tspkt is not None: | |
# spike_trains = [fresh_model_spkt,tspkt] | |
# f = spk.isi_profile(spike_trains) | |
# spk_dist_error = f(t) | |
if type(features[self.test.name]) is type(Iterable): | |
features[self.test.name] = np.mean(features[self.test.name]) | |
self.test.observation["mean"] = np.mean(self.test.observation["mean"]) | |
self.test.set_prediction(np.mean(features[self.test.name])) | |
if "Spikecount" == feature_name: | |
delta = np.abs( | |
features[self.test.name] - np.mean(self.test.observation["mean"]) | |
) | |
if np.nan == delta or delta == np.inf: | |
delta = 1000.0 | |
return delta | |
else: | |
if features[feature_name] is None: | |
return 1000.0 | |
prediction = {"value": np.mean(features[self.test.name])} | |
score_gene = self.test.judge(responses["model"], prediction=prediction) | |
if score_gene is not None: | |
if score_gene.log_norm_score is not None: | |
delta = np.abs(float(score_gene.log_norm_score)) | |
else: | |
if score_gene.raw is not None: | |
delta = np.abs(float(score_gene.raw)) | |
else: | |
delta = 1000.0 | |
else: | |
delta = 1000.0 | |
if np.nan == delta or delta == np.inf: | |
delta = 1000.0 | |
return delta | |
def opt_setup_two( | |
model_type, | |
suite, | |
nu_tests, | |
target_current, | |
spk_count, | |
template_model=None, | |
score_type=ZScore, | |
efel_filter_iterable=None, | |
tspkt = None | |
): | |
assert template_model.backend == model_type | |
template_model.params = BPO_PARAMS[model_type] | |
template_model.params_by_names(list(BPO_PARAMS[model_type].keys())) | |
template_model.seeded_current = target_current["value"] | |
template_model.spk_count = spk_count | |
template_model.tspkt = tspkt | |
sweep_protocols = [] | |
protocol = ephys.protocols.NeuronUnitAllenStepProtocol( | |
"multi_spiking", [None], [None] | |
) | |
sweep_protocols.append(protocol) | |
onestep_protocol = ephys.protocols.SequenceProtocol( | |
"multi_spiking_wraper", protocols=sweep_protocols | |
) | |
objectives = [] | |
spike_obs = [] | |
for tt in nu_tests: | |
if "Spikecount" == tt.name: | |
spike_obs.append(tt.observation) | |
spike_obs = sorted(spike_obs, key=lambda k: k["mean"], reverse=True) | |
for cnt, tt in enumerate(nu_tests): | |
feature_name = "%s" % (tt.name) | |
ft = NUFeatureAllenMultiSpike( | |
tt, template_model, cnt, target_current, spike_obs, score_type=score_type, tspkt = tspkt | |
) | |
objective = ephys.objectives.SingletonObjective(feature_name, ft) | |
objectives.append(objective) | |
score_calc = ephys.objectivescalculators.ObjectivesCalculator(objectives) | |
template_model.params_by_names(BPO_PARAMS[template_model.backend].keys()) | |
template_model.efel_filter_iterable = efel_filter_iterable | |
cell_evaluator = ephys.evaluators.CellEvaluator( | |
cell_model=template_model, | |
param_names=list(BPO_PARAMS[template_model.backend].keys()), | |
fitness_protocols={onestep_protocol.name: onestep_protocol}, | |
fitness_calculator=score_calc, | |
sim="euler", | |
) | |
assert cell_evaluator.cell_model is not None | |
return cell_evaluator, template_model | |
def opt_exec( | |
MU, NGEN, mapping_funct, cell_evaluator, mutpb=0.1, cxpb=1, neuronunit=True | |
): | |
optimisation = bpop.optimisations.DEAPOptimisation( | |
evaluator=cell_evaluator, | |
offspring_size=MU, | |
eta=35, # was 35, # was 25 | |
map_function=map, | |
selector_name="IBEA", | |
mutpb=mutpb, | |
cxpb=cxpb, | |
neuronunit=neuronunit, | |
) | |
final_pop, hall_of_fame, logs, hist = optimisation.run(max_ngen=NGEN) | |
return final_pop, hall_of_fame, logs, hist | |
def opt_to_model(hall_of_fame, cell_evaluator, suite, target_current, spk_count): | |
best_ind = hall_of_fame[0] | |
cell_evaluator.evaluate_with_lists(best_ind) | |
model = cell_evaluator.cell_model | |
tests = cell_evaluator.suite.tests | |
scores = [] | |
obs_preds = [] | |
for t in tests: | |
scores.append(t.judge(model, prediction=t.prediction)) | |
obs_preds.append( | |
(t.name, t.observation["mean"], t.prediction["mean"], scores[-1]) | |
) | |
df = pd.DataFrame(obs_preds) | |
opt = model#.model_to_dtc() | |
opt.attrs = { | |
str(k): float(v) for k, v in cell_evaluator.param_dict(best_ind).items() | |
} | |
# for k,v in cell_evaluator.cell_model.attrs.items(): | |
# print(opt.attrs[k],'opt attrs',v,'in cell evaluator') | |
# assert opt.attrs[k] == v | |
print(best_ind, "the gene") | |
target = copy.copy(opt) | |
if "vm_soma" in suite.traces.keys(): | |
target.vm_soma = suite.traces["vm_soma"] | |
opt.seeded_current = target_current["value"] | |
opt.spk_count = spk_count | |
target.seeded_current = target_current["value"] | |
target.spk_count = spk_count | |
target = inject_model_soma(target, solve_for_current=target_current["value"]) | |
opt = inject_model_soma(opt, solve_for_current=target_current["value"]) | |
return opt, target, scores, obs_preds, df | |
def make_allen_hard_coded_complete(): | |
""" | |
Manually specificy 4-5 | |
different passive/static electrical properties | |
over 4 Allen specimen id's. | |
FITest | |
623960880 | |
623893177 | |
471819401 | |
482493761 | |
""" | |
from neuronunit.optimization.optimization_management import TSD | |
## | |
# 623960880 | |
## | |
rt = RheobaseTest(observation={"mean": 70 * qt.pA, "std": 70 * qt.pA}) # yes | |
tc = TimeConstantTest(observation={"mean": 23.8 * qt.ms, "std": 23.8 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 241 * qt.MOhm, "std": 241 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -65.1 * qt.mV, "std": 65.1 * qt.mV}) | |
# capacitance = ((tc.observation['mean']))/((ir.observation['mean']))#*qt.pF | |
# ct = CapacitanceTest(observation={'mean':capacitance,'std':capacitance}) | |
fislope = FITest( | |
observation={"value": 0.18 * (pq.Hz / pq.pA), "mean": 0.18 * (pq.Hz / pq.pA)} | |
) | |
fislope.score_type = RelativeDifferenceScore | |
allen_tests = [tc, rp, ir, rt] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
allen_suite_623960880 = TestSuite(allen_tests) | |
allen_suite_623960880.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/623960880" | |
) | |
## | |
# ID 623893177 | |
## | |
rt = RheobaseTest(observation={"mean": 190 * qt.pA, "std": 190 * qt.pA}) # yes | |
tc = TimeConstantTest(observation={"mean": 27.8 * qt.ms, "std": 27.8 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 136 * qt.MOhm, "std": 136 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -77.0 * qt.mV, "std": 77.0 * qt.mV}) | |
capacitance = ((tc.observation["mean"])) / ((ir.observation["mean"])) # *qt.pF | |
ct = CapacitanceTest(observation={"mean": capacitance, "std": capacitance}) | |
## | |
fislope = FITest( | |
observation={"value": 0.12 * (pq.Hz / pq.pA), "mean": 0.12 * (pq.Hz / pq.pA)} | |
) | |
fislope.score_type = RelativeDifferenceScore | |
## | |
allen_tests = [tc, rp, ir, rt] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
allen_suite_623893177 = TestSuite(allen_tests) | |
allen_suite_623893177.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/623893177" | |
) | |
cells = {} | |
cells["623960880"] = TSD(allen_suite_623960880) | |
cells["623893177"] = TSD(allen_suite_623893177) | |
## | |
# 482493761 | |
## | |
rt = RheobaseTest(observation={"mean": 70 * qt.pA, "std": 70 * qt.pA}) # yes | |
tc = TimeConstantTest(observation={"mean": 24.4 * qt.ms, "std": 24.4 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 132 * qt.MOhm, "std": 132 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -71.6 * qt.mV, "std": 71.6 * qt.mV}) | |
fislope = FITest( | |
observation={"value": 0.09 * (pq.Hz / pq.pA), "mean": 0.09 * (pq.Hz / pq.pA)} | |
) | |
fislope.score_type = RelativeDifferenceScore | |
allen_tests = [rt, tc, rp, ir] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
# allen_tests[-1].score_type = ZScore | |
allen_suite482493761 = TestSuite(allen_tests) | |
allen_suite482493761.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/482493761" | |
) | |
## | |
# 471819401 | |
## | |
rt = RheobaseTest(observation={"mean": 190 * qt.pA, "std": 190 * qt.pA}) # yes | |
tc = TimeConstantTest(observation={"mean": 13.8 * qt.ms, "std": 13.8 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 132 * qt.MOhm, "std": 132 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -77.5 * qt.mV, "std": 77.5 * qt.mV}) | |
fislope = FITest( | |
observation={"value": 0.18 * (pq.Hz / pq.pA), "mean": 0.18 * (pq.Hz / pq.pA)} | |
) | |
fislope.score_type = RelativeDifferenceScore | |
# F/I Curve Slope 0.18 | |
allen_tests = [rt, tc, rp, ir] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
allen_suite471819401 = TestSuite(allen_tests) | |
allen_suite471819401.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/471819401" | |
) | |
list_of_dicts = [] | |
cells["471819401"] = TSD(allen_suite471819401) | |
cells["482493761"] = TSD(allen_suite482493761) | |
for k, v in cells.items(): | |
observations = {} | |
for k1 in cells["623960880"].keys(): | |
vsd = TSD(v) | |
if k1 in vsd.keys(): | |
vsd[k1].observation["mean"] | |
observations[k1] = np.round(vsd[k1].observation["mean"], 2) | |
observations["name"] = k | |
list_of_dicts.append(observations) | |
df = pd.DataFrame(list_of_dicts, index=cells.keys()) | |
df | |
return ( | |
allen_suite_623960880, | |
allen_suite_623893177, | |
allen_suite471819401, | |
allen_suite482493761, | |
cells, | |
df, | |
) | |
def make_allen_hard_coded_limited(): | |
# hard/hand code ephys constraints | |
from quantities import Hz, pA | |
from neuronunit.optimization.optimization_management import TSD | |
import pandas as pd | |
rt = RheobaseTest(observation={"mean": 70 * qt.pA, "std": 70 * qt.pA}) | |
tc = TimeConstantTest(observation={"mean": 24.4 * qt.ms, "std": 24.4 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 132 * qt.MOhm, "std": 132 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -71.6 * qt.mV, "std": 77.5 * qt.mV}) | |
allen_tests = [rt, tc, rp, ir] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
allen_suite482493761 = TestSuite(allen_tests) | |
allen_suite482493761.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/482493761" | |
) | |
rt = RheobaseTest(observation={"mean": 190 * qt.pA, "std": 190 * qt.pA}) | |
tc = TimeConstantTest(observation={"mean": 13.8 * qt.ms, "std": 13.8 * qt.ms}) | |
ir = InputResistanceTest(observation={"mean": 132 * qt.MOhm, "std": 132 * qt.MOhm}) | |
rp = RestingPotentialTest(observation={"mean": -77.5 * qt.mV, "std": 77.5 * qt.mV}) | |
fi = FITest(observation={"mean": 0.09 * Hz / pA}) #'std':77.5*qt.mV}) | |
allen_tests = [rt, tc, rp, ir] # ,fi] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
# allen_tests[-1].score_type = ZScore | |
allen_suite471819401 = TestSuite(allen_tests) | |
allen_suite471819401.name = ( | |
"http://celltypes.brain-map.org/mouse/experiment/electrophysiology/471819401" | |
) | |
list_of_dicts = [] | |
cells = {} | |
cells["471819401"] = TSD(allen_suite471819401) | |
cells["482493761"] = TSD(allen_suite482493761) | |
allen_tests = [rt, fi] | |
for t in allen_tests: | |
t.score_type = RelativeDifferenceScore | |
allen_suite3 = TestSuite(allen_tests) | |
cells["fi_curve"] = TSD(allen_suite3) | |
for k, v in cells.items(): | |
observations = {} | |
for k1 in cells["482493761"].keys(): | |
vsd = TSD(v) | |
if k1 in vsd.keys(): | |
vsd[k1].observation["mean"] | |
observations[k1] = np.round(vsd[k1].observation["mean"], 2) | |
observations["name"] = k | |
list_of_dicts.append(observations) | |
df = pd.DataFrame(list_of_dicts) | |
return allen_suite471819401, allen_suite482493761, df, cells |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment