Created
March 6, 2015 18:46
-
-
Save matham/16ce6e8c43b32d9a0f28 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from time import sleep | |
from threading import Thread | |
import tables as tb | |
from os.path import exists, isfile | |
from functools import partial | |
from re import match, compile | |
from Queue import Queue | |
from pybarst.core.server import BarstServer | |
from pybarst.ftdi import FTDIChannel | |
from pybarst.ftdi.adc import ADCSettings | |
from pybarst.ftdi.switch import SerializerSettings | |
from pybarst.serial import SerialChannel | |
# order is air, a, b and must be of length 3. | |
mfc_config = (('port', 'mfc_id'), ) | |
_finish_program = False | |
def set_mfc_rate(mfc, mfc_id, to, val): | |
mfc.write('!{:02X},S,{:.3f}\r\n'.format(mfc_id, val), to) | |
rate_out = '!{:02X},S{:.3f}\r\n'.format(mfc_id, val) | |
_, val = mfc.read(len(rate_out), to) | |
if val != rate_out: | |
raise Exception('Failed setting MFC rate. ' | |
'Expected "{}", got "{}"'.format(rate_out, val)) | |
def get_mfc_rate(mfc, mfc_id, to, rate_pat): | |
mfc.write('!{:02X},F\r\n'.format(mfc_id), to) | |
t, val = mfc.read(24, stop_char='\n', timeout=to) | |
m = match(rate_pat, val) | |
if m is None: | |
raise Exception('Failed getting MFC rate. ' | |
'Got "{}"'.format(val)) | |
return t, float(m.group(1)) | |
def start_barst(mfc_config): | |
assert len(mfc_config) == 3 | |
server = BarstServer( | |
barst_path=r'C:\Program Files\Barst\Barst64.exe', | |
pipe_name=r'\\.\pipe\TestPipe') | |
server.close_server() | |
time.sleep(1.) | |
server.open_server() | |
server.get_manager('ftdi') | |
adc = ADCSettings( | |
clock_bit=3, lowest_bit=4, num_bits=3, sampling_rate=1000, chan1=True, | |
chan2=False, transfer_size=100, data_width=16, hw_buff_size=0) | |
odor = SerializerSettings( | |
clock_bit=0, data_bit=1, latch_bit=2, num_boards=2, output=True) | |
ftdi = FTDIChannel( | |
channels=[adc, odor], server=server, desc='Birch Board rev1 B') | |
mfcs = [] | |
to = 4000 | |
for name, mfc_id in mfc_config: | |
if name is None: | |
mfcs.append((None, None, None)) | |
continue | |
mfc = SerialChannel( | |
server=server, port_name=name, max_write=96, max_read=96, | |
baud_rate=9600, stop_bits=1, parity='none', byte_size=8) | |
mfc.open_channel() | |
# set digital mode | |
mfc.write('!{:02X},M,D\r\n'.format(mfc_id), to) | |
dig_out = '!{:02X},MD\r\n'.format(mfc_id) | |
_, val = mfc.read(len(dig_out), to) | |
if val != dig_out: | |
raise Exception('Failed setting MFC to digital mode. ' | |
'Expected "{}", got "{}"'.format(dig_out, val)) | |
# set to standard LPM | |
mfc.write('!{:02X},U,SLPM\r\n'.format(mfc_id), to) | |
units_out = '!{:02X},USLPM\r\n'.format(mfc_id) | |
_, val = mfc.read(len(units_out), to) | |
if val != units_out: | |
raise Exception('Failed setting MFC to use SLPM units. ' | |
'Expected "{}", got "{}"'.format(units_out, val)) | |
setter = partial(set_mfc_rate, mfc, mfc_id, to) | |
getter = partial( | |
get_mfc_rate, mfc, mfc_id, to, | |
compile(r'\!{:02X},([0-9\.]+)\r\n'.format(mfc_id))) | |
setter(0) | |
mfcs.append((mfc, setter, getter)) | |
adc, odor = ftdi.open_channel(alloc=True) | |
adc.open_channel() | |
adc.set_state(True) | |
odor.open_channel() | |
odor.set_state(True) | |
return server, ftdi, adc, odor, mfcs | |
def create_file(hdf_filename, odor_names, odor_mfc_map): | |
if exists(hdf_filename): | |
raise Exception('{} already exists'.format(hdf_filename)) | |
odor_names = list(odor_names) + ['', ] * (16 - len(odor_names)) | |
for i, name in enumerate(odor_names): | |
if name: | |
continue | |
odor_names[i] = 'p{}'.format(i) | |
fh = tb.open_file( | |
filename=hdf_filename, mode='w', title='PID Olfactometer diagnostics') | |
data = fh.create_group(fh.root, name='data', title='Holds the data') | |
odors = fh.create_group(data, name='odors', title='Holds valve status') | |
pid = fh.create_group(data, name='pid', title='Holds the PID output') | |
mfc = fh.create_group(data, name='mfc', title='Holds the MFC state') | |
mfc._v_attrs.odor_mfc_map = odor_mfc_map | |
for i, odor in enumerate(odor_names): | |
o = fh.create_group(odors, name='p{}'.format(i), title=odor) | |
fh.create_earray( | |
o, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, )) | |
fh.create_earray( | |
o, name='values', atom=tb.BoolAtom(shape=1), shape=(0, )) | |
fh.create_earray( | |
pid, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, )) | |
fh.create_earray( | |
pid, name='values', atom=tb.Float32Atom(shape=1), shape=(0, )) | |
fh.create_earray( | |
pid, name='idx', atom=tb.UInt64Atom(shape=1), shape=(0, )) | |
for mfc_type in ('air', 'a', 'b'): | |
m = fh.create_group(mfc, name=mfc_type) | |
fh.create_earray( | |
m, name='ts', atom=tb.Float64Atom(shape=1), shape=(0, )) | |
fh.create_earray( | |
m, name='values', atom=tb.Float64Atom(shape=1), shape=(0, )) | |
return fh | |
def close_barst(server, ftdi, odor, mfcs): | |
for mfc, setter, _ in mfcs: | |
setter(0) | |
mfc.close_channel_server() | |
odor.write(set_low=range(16)) | |
ftdi.close_channel_server() | |
server.close_server() | |
def read_adc(adc, hdf_data, hdf_idx, hdf_ts): | |
global _finish_program | |
while not _finish_program: | |
data = adc.read() | |
hdf_data.append([data.chan1_data]) | |
hdf_idx.append([[data.chan1_ts_idx]]) | |
hdf_ts.append([[data.ts]]) | |
def read_mfc(getter, hdf_data, hdf_ts): | |
global _finish_program | |
while not _finish_program: | |
ts, val = getter() | |
hdf_data.append([[val]]) | |
hdf_ts.append([[ts]]) | |
def write_mfc(setter, mfc_queue): | |
while True: | |
val, callback = mfc_queue.get(block=True) | |
if val == 'eof': | |
return | |
setter(val) | |
if callback is not None: | |
callback() | |
def write_odor(odor, odor_queue, hdf_odors): | |
last_state = [None, ] * 16 | |
hdf_groups = [] | |
for i in range(16): | |
g = getattr(hdf_odors, 'p{}'.format(i)) | |
hdf_groups.append((g.ts, g.values)) | |
while True: | |
high_vals, low_vals, callback = odor_queue.get(block=True) | |
if high_vals == 'eof': | |
return | |
ts = odor.write(set_high=high_vals, set_low=low_vals) | |
for vals, test_val in ((high_vals, True), (low_vals, False)): | |
for i in vals: | |
if last_state[i] is test_val: | |
continue | |
last_state[i] = test_val | |
hdf_groups[i][0].append([[ts]]) | |
hdf_groups[i][1].append([[test_val]]) | |
if callback is not None: | |
callback() | |
def run_program( | |
program, hdf_filename, odor_names, odor_mfc_map=[], | |
mfc_config=mfc_config): | |
global _finish_program | |
_finish_program = False | |
server, ftdi, adc, odor, mfcs = start_barst(mfc_config) | |
fh = create_file(hdf_filename, odor_names, odor_mfc_map) | |
data = fh.root.data | |
adc_thread = Thread( | |
target=read_adc, name='adc', | |
args=(adc, data.pid.values, data.pid.idx, data.pid.ts)) | |
adc_thread.start() | |
odor_queue = Queue() | |
odor_thread = Thread( | |
target=write_odor, name='odor', args=(odor, odor_queue, data.odors)) | |
odor_thread.start() | |
mfc_queues = [] | |
mfc_read_threads = [] | |
mfc_write_threads = [] | |
for (mfc, setter, getter), name in zip(mfcs, ('air', 'a', 'b')): | |
if mfc is None: | |
mfc_queues.append(None) | |
mfc_read_threads.append(None) | |
mfc_write_threads.append(None) | |
continue | |
thread = Thread( | |
target=read_mfc, name='read_mfc', | |
args=(getter, getattr(data.mfc, name).values, | |
getattr(data.mfc, name).ts)) | |
thread.start() | |
mfc_read_threads.append(thread) | |
q = Queue() | |
thread = Thread(target=write_mfc, name='write_mfc', args=(setter, q)) | |
thread.start() | |
mfc_queues.append(q) | |
mfc_write_threads.append(thread) | |
q_air, q_a, q_b = mfc_queues | |
for delay, high_odors, low_odors, mfc_air, mfc_a, mfc_b in program: | |
if delay: | |
sleep(delay) | |
odor_queue.put((high_odors, low_odors, None), block=False) | |
if q_air is not None: | |
q_air.put(mfc_air, block=False) | |
if q_a is not None: | |
q_a.put(mfc_a, block=False) | |
if q_b is not None: | |
q_b.put(mfc_b, block=False) | |
_finish_program = True | |
adc_thread.join() | |
odor_queue.put(('eof', None, None), block=False) | |
odor_thread.join() | |
for thread in mfc_read_threads: | |
if thread is not None: | |
thread.join() | |
for thread, queue in zip(mfc_read_threads, mfc_queues): | |
if thread is not None: | |
queue.put(('eof', None), block=False) | |
thread.join() | |
fh.close() | |
close_barst(server, ftdi, odor, mfcs) | |
if __name__ == '__main__': | |
odor_names = ['NO', 'Limonene', 'Mix'] | |
mfc_config = [(None, None, None), ] * 3 | |
filename = 'data.h5' | |
program = () | |
run_program( | |
program, hdf_filename=filename, odor_names=odor_names, | |
odor_mfc_map=[], mfc_config=mfc_config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment