Last active
July 23, 2020 21:32
-
-
Save dehowell/4b1c71f577e7c7a722c02ca021e4f99e to your computer and use it in GitHub Desktop.
Script for correcting Jamie's data files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
import csv | |
import itertools | |
import os | |
import os.path | |
import statistics | |
import sys | |
import traceback | |
OUTPUT_PATH = 'Transformed Data Files' | |
def read_rpt(filename): | |
with open(filename) as f: | |
header = f.readline() | |
keys = header.split() | |
for line in f: | |
vals = line.split() | |
row = dict(zip(keys, vals)) | |
yield row | |
def find_event(records, event_name): | |
times = [r['Time'] for r in records if r['Ev'] == event_name] | |
times = [int(t) for t in times] | |
return { | |
'event_name': event_name, | |
'start': min(times), | |
'end': max(times) | |
} | |
def split_event(record, event): | |
split_point = event['end'] - 3 * 60 | |
if record['Ev'] == event['event_name']: | |
before_split = int(record['Time']) <= split_point | |
new_event_name = record['Ev'] + (before_split and 'a' or 'b') | |
new_record = {**record} | |
new_record['Ev'] = new_event_name | |
return new_record | |
else: | |
return record | |
def aggregate_by_event(records): | |
for event_name, group in itertools.groupby(records, key=lambda r: r['Ev']): | |
group = list(group) | |
def mean_of(column): | |
not_missing = [float(r[column]) for r in group if r[column] != '.'] | |
if len(not_missing) > 0: | |
return statistics.mean(not_missing) | |
else: | |
return '.' | |
group_summary = { | |
'Ev': event_name, | |
'Time': min([r['Time'] for r in group]) | |
} | |
for col in ['HR', 'RSA', 'TidVol', 'ResPer', 'SCL', 'SCR', 'Syst', 'Dias', 'Rate']: | |
group_summary[col] = mean_of(col) | |
yield group_summary | |
def process_input_file(input_file): | |
records = list(read_rpt(input_file)) | |
event1 = find_event(records, '1') | |
corrected = [split_event(r, event1) for r in records] | |
basename = os.path.basename(input_file) | |
output_path = os.path.join(os.path.dirname(input_file), OUTPUT_PATH) | |
os.makedirs(output_path, exist_ok=True) | |
output_rpt_file = os.path.join(output_path, basename) | |
with open(output_rpt_file, 'w') as f: | |
print("Transforming corrected RPT files for {} at {}".format(input_file, output_rpt_file)) | |
f.write('Time Ev HR RSA TidVol ResPer SCL SCR Syst Dias Rate\n') | |
for row in corrected: | |
line = '{Time} {Ev:>2} {HR:>6} {RSA:>4} {TidVol:>7} {ResPer:>7} {SCL:>6} {SCR:>6} {Syst:>5} {Dias:>5} {Rate:>4}\n'.format(**row) | |
f.write(line) | |
output_sum_file = os.path.join(output_path, os.path.splitext(basename)[0] + '.SUM') | |
with open(output_sum_file, 'w') as f: | |
print("Computing SUMMARY files for {} at {}".format(input_file, output_sum_file)) | |
writer = csv.DictWriter(f, fieldnames=['Ev', 'Time', 'HR', 'RSA', 'TidVol', 'ResPer', 'SCL', 'SCR', 'Syst', 'Dias', 'Rate']) | |
writer.writeheader() | |
for row in aggregate_by_event(corrected): | |
writer.writerow(row) | |
def main(input_files): | |
for input_file in input_files: | |
try: | |
process_input_file(input_file) | |
except: | |
print('Error processing file: {}'.format(input_file)) | |
traceback.print_exc() | |
if __name__ == '__main__': | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment