Created
April 22, 2020 20:52
-
-
Save philerooski/ce255e65b47cfd3df653c1559330141d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import synapseclient | |
import argparse | |
import pandas | |
import json | |
import os | |
DUMMY_ARG = "dummy" # A 'None' like string we can pass to boto | |
def read_args(): | |
parser = argparse.ArgumentParser( | |
description="Combine partitions of a dataset and store to Synapse. " | |
"This script is to be run on a machine where a " | |
"volume containing the partitioned datasets " | |
"is mounted." | |
"\n\n" | |
"The directory stucture is expected to consist of " | |
"one or more uniquely named directories within " | |
"the `inputPath` directory. Within each of these " | |
"directories are expected to be tabular files, each " | |
"with an identical schema." | |
"\n\n" | |
"For example, if we had this directory structure " | |
"within our root folder:" | |
"\n\n" | |
"mPower_efs\n" | |
"|-- handToNoseLeft\n" | |
"| |-- file1.csv\n" | |
"| |-- file2.csv\n" | |
"|-- handToNoseRight\n" | |
" |-- file1.csv\n" | |
" |-- file2.csv\n" | |
"\n" | |
"Then this call: `python reduce_features.py " | |
"--assay handToNoseLeft --activity tremor --output " | |
"syn1234567 --file-format csv --input-path /mPower_efs` " | |
"\n\n" | |
"would store the file " | |
"'tremorFeatures_handToNoseLeft.csv' " | |
"in syn1234567.", | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument("--input-path", required=True, | |
help="Required. Location of files to be concatenated.") | |
parser.add_argument("--assay", required=True, | |
help="Required. Name of the assay. This should also \n" | |
"be the name of the subdirectory within \n" | |
"`input-path` to concatenate the feature files \n" | |
"within.") | |
parser.add_argument("--file-format", required=True, | |
help="The format of both the feature file being " | |
"updated and/or the expected output. Accepted " | |
"arguments include 'tsv', 'csv', and others. " | |
"If the argument isn't recognized, it will be " | |
"interpreted as the literal delimiter to use. ") | |
parser.add_argument("--reduce-job-info", required=True, | |
help="Synapse ID of reduce job info file.") | |
parser.add_argument("--output", | |
default=DUMMY_ARG, | |
help="Synapse ID of the parent entity where \n" | |
"concatenated features will be stored. Ignored \n" | |
"when --update is set. Not setting this flag will \n" | |
"write the features to the current working \n" | |
"directory but not upload them to Synapse.") | |
parser.add_argument("--activity", | |
help="Name of the activity (tremor, tapping, ...). \n" | |
"Ignored when --update is set. Required otherwise.") | |
parser.add_argument("--update", | |
default=DUMMY_ARG, | |
help="The Synapse ID of a pre-existing feature file \n" | |
"on Synapse to update. Including this argument \n" | |
"will append the concatenated feature files to \n" | |
"the end of the file already on Synapse.") | |
parser.add_argument("--job-table", default=None, | |
help="Synapse ID of the job table used during the \n" | |
"job submission process. This will be deleted.") | |
args = parser.parse_args() | |
return args | |
def get_job_info(syn, reduce_job_info): | |
f = syn.get(reduce_job_info) | |
with open(f.path, "r") as job_file: | |
job_info = json.loads(job_file.read()) | |
return job_info | |
def curate_files(loc, assay): | |
parent_directory = os.path.join(loc, assay) | |
features = [os.path.join(parent_directory, p) | |
for p in os.listdir(parent_directory)] | |
return features | |
def get_sep(file_format): | |
if file_format == "csv": | |
return "," | |
elif file_format == "tsv": | |
return "\t" | |
else: | |
return file_format | |
def combine_files(paths, file_format): | |
extension = os.path.splitext(paths[0])[1] | |
dataframes = [pandas.read_csv(p, sep=get_sep(file_format), header=0, | |
index_col=False) for p in paths] | |
features = pandas.concat(dataframes, ignore_index=True) | |
return features, extension | |
def write_features(features, activity, assay, file_format, extension): | |
fname = "{}Features_{}{}".format(activity, assay, extension) | |
features.to_csv(fname, sep=get_sep(file_format), index=False, header=True) | |
return fname | |
def upload(syn, fname, output, job_info): | |
f = synapseclient.File( | |
path=fname, | |
name=fname, | |
parent=output) | |
syn.store(f, used=job_info['used'], executed=job_info['executed']) | |
def update(syn, features, synapse_features, file_format, assay, job_info): | |
f = syn.get(synapse_features) | |
sep = get_sep(file_format) | |
old_features = pandas.read_csv(f.path, sep=sep, | |
index_col=False, header=0) | |
new_features = pandas.concat([old_features, features], ignore_index=True) | |
if job_info['verify_integrity']: | |
# if verify_columns was not set, entire row must match exactly. | |
new_features = new_features.drop_duplicates( | |
subset=job_info['verify_columns'], | |
keep='last') | |
new_features.to_csv(f.path, sep=sep, index=False, header=True) | |
syn.store(f, used=job_info['used'], executed=job_info['executed']) | |
def cleanup(syn, job_table, reduce_job_info, input_path, assay): | |
# Delete job table and reduce job info | |
syn.delete(job_table) | |
syn.delete(reduce_job_info) | |
# Delete original, partitioned features files | |
os.system("rm -rf {}".format(os.path.join(input_path, assay))) | |
def main(): | |
args = read_args() | |
syn = synapseclient.login(os.environ['synapseUsername'], | |
os.environ['synapsePassword']) | |
job_info = get_job_info(syn, args.reduce_job_info) | |
feature_files = curate_files(args.input_path, args.assay) | |
if not len(feature_files): | |
raise RuntimeError("The directory in which the features " | |
"were expected to be {} is empty.".format( | |
os.path.join(args.input_path, args.assay))) | |
features, extension = combine_files(feature_files, args.file_format) | |
try: | |
if args.update != DUMMY_ARG: | |
update(syn, features, args.update, args.file_format, | |
args.assay, job_info) | |
elif args.output != DUMMY_ARG: | |
fname = write_features(features, args.activity, args.assay, | |
args.file_format, extension) | |
upload(syn, fname, args.output, job_info) | |
cleanup(syn, args.job_table, args.reduce_job_info, | |
args.input_path, args.assay) | |
except Exception as e: | |
raise e | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment