Created
July 30, 2019 01:16
-
-
Save cooperpellaton/00b2a4465e5c152f0852095f8807f448 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Script for generating codings for participants. | |
This handles the coding of participants in FHAage | |
as variables from the master sheet (run_sheet) and | |
their invidivual trial runs. | |
""" | |
__author__ = "Cooper Pellaton" | |
__copyright__ = "Copyright 2019, Georgia Tech, CABI" | |
__version__ = "0.0.1" | |
__maintainer__ = "Cooper Pellaton" | |
__email__ = "[email protected]" | |
__status__ = "Alpha" | |
import argparse | |
import csv | |
import os | |
import pathlib | |
import re | |
import sys | |
from pathlib import Path | |
import pprint | |
# Globals | |
encodings = {} | |
encbetween = { | |
"VFFastCorr": 1, | |
"VFFastIncorr": 2, | |
"VFSlowCorr": 3, | |
"VFSlowIncorr": 4, | |
"IFFastCorr": 5, | |
"IFFastIncorr": 6, | |
"IFSlowCorr": 7, | |
"IFSlowIncorr": 8, | |
"VHFastCorr": 9, | |
"VHFastIncorr": 10, | |
"VHSlowCorr": 11, | |
"VHSlowIncorr": 12, | |
"IHFastCorr": 13, | |
"IHFastIncorr": 14, | |
"IHSlowCorr": 15, | |
"IHSlowIncorr": 16, | |
"NFFastCorr": 17, | |
"NFFastIncorr": 18, | |
"NFSlowCorr": 19, | |
"NFSlowIncorr": 20, | |
"NHFastCorr": 21, | |
"NHFastIncorr": 22, | |
"NHSlowCorr": 23, | |
"NHSlowIncorr": 24, | |
"VFNR": 25, | |
"IFNR": 26, | |
"VHNR": 27, | |
"IHNR": 28, | |
"NFNR": 29, | |
"NHNR": 30, | |
"CF": 31, | |
"CH": 32, | |
"CN": 33, | |
} | |
encwithin = { | |
"VFFastCorr": 1, | |
"VFFastIncorr": 2, | |
"VFSlowCorr": 3, | |
"VFSlowIncorr": 4, | |
"IFFastCorr": 5, | |
"IFFastIncorr": 6, | |
"IFSlowCorr": 7, | |
"IFSlowIncorr": 8, | |
"VHFastCorr": 9, | |
"VHFastIncorr": 10, | |
"VHSlowCorr": 11, | |
"VHSlowIncorr": 12, | |
"IHFastCorr": 13, | |
"IHFastIncorr": 14, | |
"IHSlowCorr": 15, | |
"IHSlowIncorr": 16, | |
"NFFastCorr": 17, | |
"NFFastIncorr": 18, | |
"NFSlowCorr": 19, | |
"NFSlowIncorr": 20, | |
"NHFastCorr": 21, | |
"NHFastIncorr": 22, | |
"NHSlowCorr": 23, | |
"NHSlowIncorr": 24, | |
"VFNR": 25, | |
"IFNR": 26, | |
"VHNR": 27, | |
"IHNR": 28, | |
"NFNR": 29, | |
"NHNR": 30, | |
"F": 31, | |
"H": 32, | |
"N": 33, | |
} | |
types = {"face": "F", "house": "H", "neutral": "N", "catch": "C"} | |
validities = {True: "V", False: "I"} | |
speed = {True: "Fast", False: "Slow"} | |
corr = {True: "Corr", False: "Incorr"} | |
# Setup and define the arg parser. | |
# This is so that we can take command line input. | |
parser = argparse.ArgumentParser( | |
description="Utility for generating number encodings from pariticpant files." | |
) | |
parser.add_argument( | |
"master_file", | |
metavar="n", | |
type=str, | |
help="The path to the master file containing the handedness/encodings of each person.", | |
) | |
parser.add_argument( | |
"participant_file", | |
metavar="p", | |
type=str, | |
help="The path to the participant file to be parsed.", | |
) | |
parser.add_argument( | |
"threshold", | |
metavar="t", | |
type=float, | |
help="The time below which is considered fast, and above which is considered slow.", | |
) | |
args = parser.parse_args() | |
pp = pprint.PrettyPrinter(indent=4) | |
def main(): | |
# Call read_files on current path. | |
files = read_files(str(args.participant_file)) | |
participant_ids = [] | |
# Build a list of all the participants. | |
for _file in files: | |
parts = _file.split("_") | |
participant_ids.append(parts[0][1:]) | |
participants = [] | |
for _id in set(participant_ids): | |
to_add = [] | |
for _pfile in files: | |
if _pfile.split("_")[0][1:] == str(_id): | |
to_add.append(args.participant_file + _pfile) | |
participants.append([_id, set(to_add)]) | |
threshold = args.threshold | |
for participant in participants: | |
_paths = sorted(participant[1]) | |
_id = int(participant[0]) | |
# For each participant_id run the alignment | |
fetch_participant_alignment(str(args.master_file), _id) | |
output = {"between": [], "within": []} | |
for _path in _paths: | |
# Get the run number from the path. | |
run_number = int(_path.split("/")[-1].split("_")[1][-1]) | |
# Do a between trial. | |
output["between"] += generate_codings(_path, threshold, run_number, 439.5) | |
# Do a within trial. | |
output["within"] += generate_codings( | |
_path, threshold, run_number, 439.5, False | |
) | |
write_to_disk(_id, output["between"]) | |
write_to_disk(_id, output["within"], False) | |
# We've finished now. Exit cleanly. | |
sys.exit(0) | |
def read_files(path="."): | |
""" | |
Finds the suitable log files in a given directory. | |
Returns a list of log files in target directory. | |
""" | |
file_list = [] | |
for fname in os.listdir(path): | |
if fname.endswith(".csv"): | |
file_list.append(fname) | |
return file_list | |
# We assume that the master sheet will be in CSV form. | |
def fetch_participant_alignment(master_sheet_path, p_id): | |
"""Determines what group the participant is in. | |
IE. whether face is left hand, or face is right hand. | |
Store this data to a dictionary for later use. | |
""" | |
f = open(master_sheet_path, "r") | |
sheet = csv.reader(f, dialect="excel") | |
csvr = list(sheet) | |
global encodings | |
p_align = csvr[p_id + 1] # offset by one for the un-used row at the top | |
if p_align[3] == "LEFT": | |
encodings = {"1": "face", "6": "house"} | |
else: | |
encodings = {"6": "face", "1": "house"} | |
def generate_codings(participant_path, threshold, run_number, offset, between=True): | |
"""Generate the coding for each row. | |
This generates the coding of the response to the associated types so | |
that they can be stored in the output file and used later. | |
""" | |
out_data = [] | |
with open(participant_path) as f: | |
p_data = csv.DictReader(f) | |
# Hardcode the initial start time. | |
# This is 3 * TR + 3 sec. fixation | |
base_time = (3 * 1.5) + 3 + (run_number * offset) | |
for row in p_data: | |
if row["JitterTime"] != "" and row["JitterTime"] != None: | |
base_time += float(row["JitterTime"]) | |
# Some variables we need to keep track of to make later | |
# assingments for the labels. | |
is_correct = False | |
valid_resp = False | |
is_nr = False | |
is_fast = False | |
is_valid = False | |
is_catch = False | |
is_neutral = False | |
valid = "" | |
correct = "" | |
catch_type = "" | |
# Estalbish what the participant response *should* have been. | |
if row["FaceHouseMov"] != "" and row["FaceHouseMov"] is not None: | |
if "face" in row["FaceHouseMov"]: | |
correct = "face" | |
else: | |
correct = "house" | |
if row["CueType"] == row["TrialType"]: | |
is_valid = True | |
valid = row["TrialType"] | |
if row["TrialType"] == "catch": | |
is_catch = True | |
catch_type = row["CueType"] | |
else: | |
valid = row["TrialType"] | |
# Determine if the response was the right one, or valid. | |
resp = row["key_resp_2.keys"] | |
if resp != "" and resp != None and resp != "None": | |
valid_resp = True | |
resp = re.sub("[^0-9]", "", resp).strip() | |
if len(resp) > 1: | |
resp = resp[0] | |
#print("Catch: %s" % is_catch) | |
#print("Response: %s" % resp) | |
#print("Valid response: %s" % valid_resp) | |
if not valid_resp: | |
is_nr = True | |
if valid_resp and encodings[resp] == correct: | |
is_correct = True | |
# Determine if this response was Fast/Slow. | |
rate = re.sub("[^0-9]", "", row["key_resp_2.rt"]).strip() | |
if row["key_resp_2.rt"] != "" and float(rate) < threshold: | |
is_fast = True | |
# Determine if this was a neutral trial. | |
if row["CueText"] == "50N": | |
is_neutral = True | |
# Now piece together the string using our states. | |
name = "" | |
# Calculate timings for our entries. | |
if between: | |
#print("Between") | |
# Generate the label for the encoding. | |
if is_catch and is_neutral: | |
name += "CN" | |
else: | |
if is_neutral: | |
name += "N" + types[valid] | |
if not is_catch and not is_neutral: | |
name += validities[is_valid] + types[valid] | |
if not is_neutral and is_catch: | |
name += types[valid] | |
if is_nr and "C" not in name: | |
name += "NR" | |
if is_catch: | |
name += types[catch_type] | |
elif not is_nr and not is_catch: | |
name += speed[is_fast] + corr[is_correct] | |
val = encbetween[name] | |
# write data to array | |
out_data.append([val, base_time]) | |
# Calculated next time. | |
base_time += 3 + float(row["FHDuration"]) | |
else: | |
#print("Within") | |
if is_catch: | |
name += types[catch_type] | |
else: | |
if is_catch and is_neutral: | |
name += "CN" | |
if is_neutral: | |
name += "N" + types[valid] | |
if not is_catch and not is_neutral: | |
name += validities[is_valid] + types[valid] | |
if not is_neutral and is_catch: | |
name += types[valid] | |
if is_nr and "C" not in name: | |
name += "NR" | |
if not is_nr: | |
name += speed[is_fast] + corr[is_correct] | |
cue = encwithin[types[row["CueType"]]] | |
# Calculate the cue time. | |
val = encwithin[name] | |
out_data.append([cue, base_time]) | |
# Calculate the trial time. | |
base_time += 3 | |
if not is_catch: | |
out_data.append([val, base_time]) | |
# Calculated next time. | |
base_time += float(row["FHDuration"]) | |
return out_data | |
def write_to_disk(p_id, out_data, between=True): | |
"""Writes the output data to disk. | |
Make sure the output path exists before writing. | |
Compensate for possibilty of between and within trials | |
and handle the naming for these appropriately. | |
""" | |
# Before writing output, make sure the directories exist. | |
pathlib.Path("zzz_output/S%s" % (p_id)).mkdir(parents=True, exist_ok=True) | |
top_line = generate_top_line(between) | |
if between: | |
f = open("zzz_output/S%s/S%s_between.txt" % (p_id, p_id), "w") | |
else: | |
f = open("zzz_output/S%s/S%s_within.txt" % (p_id, p_id), "w") | |
f.write("%s\n" % top_line) | |
for item in out_data: | |
f.write("%s %s\n" % (item[1], item[0])) | |
f.close() | |
def generate_top_line(between): | |
""" Generates the top line of the entry for FIDLE.""" | |
top_line = "1.5 0fix " | |
if between: | |
for key, val in encbetween.items(): | |
top_line+="%s%s "%(val, key) | |
else: | |
for key, val in encwithin.items(): | |
top_line+="%s%s "%(val, key) | |
return top_line | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment