Last active
May 12, 2017 15:13
-
-
Save patrick-samy/df33e296885364f602f0c27f1eb139a8 to your computer and use it in GitHub Desktop.
Read live data from a CMS50D+ pulse oximeter with firmware v4.6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# Copyright (c) 2016 Tommi Airikka <[email protected]> | |
# License: GPLv2 | |
import sys, struct, serial, argparse | |
from dateutil.parser import parse | |
##################### | |
##### Variables ##### | |
##################### | |
parser = argparse.ArgumentParser(description='Download stored data from a CMS50D+ oximeter.') | |
parser.add_argument('device', type=str, help='path to device file') | |
parser.add_argument('outfile', type=str, help='output file path') | |
parser.add_argument('-s', '--start-time', dest='starttime', type=str, help='start time (\"YYYY-MM-DD HH:MM:SS\")') | |
args = parser.parse_args() | |
device = args.device | |
outfile = args.outfile | |
starttime = args.starttime | |
ser = serial.Serial() | |
################### | |
##### Helpers ##### | |
################### | |
# Pack little endian | |
def _ple(i): | |
return struct.pack("<I", i) | |
def _get_real_values((val1,val2)): | |
oval1 = ord(val1) | |
oval2 = ord(val2) | |
v1 = oval1 - 0x80 | |
v2 = oval2 - 0x80 | |
if v1 == 0 or oval1 == 0xFF: | |
v1 = 127 | |
if v2 == 0 or oval2 == 0xFF: | |
v2 = 255 | |
return (v1,v2) | |
def _parse_list(toparse, parsed): | |
print "start parsing" | |
while toparse: | |
if len(toparse) > 1: | |
#fs = (toparse.pop(0),toparse.pop(0)) | |
#(f,s) = _get_real_values(fs) | |
a = toparse.pop(0); | |
b = toparse.pop(0); | |
c = toparse.pop(0); | |
d = toparse.pop(0); | |
e = toparse.pop(0); | |
f = toparse.pop(0); | |
g = toparse.pop(0); | |
h = toparse.pop(0); | |
i = toparse.pop(0); | |
print ord(f) & 0x7f, ord(g) & 0x7f | |
else: | |
toparse.pop(0) | |
print "" | |
##################### | |
##### Functions ##### | |
##################### | |
def configure_serial(ser): | |
ser.baudrate = 115200 # 115200 | |
ser.bytesize = serial.EIGHTBITS # 8 | |
ser.parity = serial.PARITY_NONE # N | |
ser.stopbits = serial.STOPBITS_ONE # 1 | |
ser.xonxoff = 1 # XON/XOFF flow control | |
ser.timeout = 1 | |
ser.port = device | |
def get_raw_data(ser): | |
sys.stdout.write("Connecting to device...") | |
sys.stdout.flush() | |
ser.open() | |
sys.stdout.write("reading...") | |
sys.stdout.flush() | |
ser.write(b'\x7d\x81\xa1\x80\x80\x80\x80\x80\x80') | |
raw = list(ser.read(9)) | |
while len(raw) >= 9: | |
print ord(raw[5]) & 0x7f, ord(raw[6]) & 0x7f | |
raw = ser.read(9) | |
ser.close() | |
if len(raw) <= 1: | |
print("no data received. Is the device on?") | |
exit(43) | |
print("done!") | |
return raw | |
def parse_raw_data(data): | |
sys.stdout.write("Parsing data...", ) | |
sys.stdout.flush() | |
parsed = [] | |
_parse_list(data, parsed) | |
print("done!") | |
return parsed | |
def get_len_of_parsed_data(parsed): | |
return len(parsed) / 2 # 1Hz, two values (pulse and sats) | |
def write_to_file(parsed, total_len, f): | |
sys.stdout.write("Writing to file...", ) | |
sys.stdout.flush() | |
zeroval = _ple(0) | |
f.write(_ple(856)) | |
f.write(_ple(1)) | |
for _ in range(212): | |
f.write(zeroval) | |
f.write(_ple(1)) | |
for _ in range(55): | |
f.write(zeroval) | |
f.write(_ple(total_len)) | |
for e in parsed: | |
f.write(chr(e)) | |
def change_starttime(f): | |
if starttime != None: | |
sys.stdout.write("changing start time...", ) | |
sys.stdout.flush() | |
dt = parse(starttime, dayfirst=False, yearfirst=True) | |
year = _ple(dt.year) | |
month = _ple(dt.month) | |
day = _ple(dt.day) | |
hour = _ple(dt.hour) | |
minute = _ple(dt.minute) | |
second = _ple(dt.second) | |
f.seek(0x420) | |
s = year + month + day + hour + minute + second | |
f.write(s) | |
################ | |
##### Main ##### | |
################ | |
configure_serial(ser) | |
data = get_raw_data(ser) | |
parsed = parse_raw_data(data) | |
total_len = get_len_of_parsed_data(parsed) | |
f = open(outfile, 'wb') | |
write_to_file(parsed, total_len, f) | |
change_starttime(f) | |
f.close() | |
print("done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment