Created
April 22, 2020 20:16
-
-
Save philerooski/3e370dba84b55843d651a064dbea165b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import synapseclient | |
import pandas | |
import argparse | |
import boto3 | |
import os | |
import json | |
import uuid | |
AWS_BATCH_ARRAY_SIZE_LIMIT = 10000 | |
USED = set(["https://github.com/Sage-Bionetworks/mPowerAnalysis/blob/master/featureExtraction/tremorModule.R", "https://hub.docker.com/r/philsnyder/mpower-feature-extraction/"]) | |
EXECUTED = set(['https://github.com/Sage-Bionetworks/mPowerAnalysis/blob/master/featureExtraction/batch/submit_jobs.py']) | |
DUMMY_ARG = "dummy" # A 'None' like string we can pass to boto | |
def read_args(): | |
parser = argparse.ArgumentParser( | |
description="Submit jobs to AWS Batch to extract features " | |
"from raw data for a single assay. All flagged " | |
"arguments are necessary unless marked otherwise.") | |
parser.add_argument("--input-table", | |
required=True, | |
help="Synapse ID of a table containing a recordId " | |
"column and raw features. All record IDs in " | |
"the table will be submitted for processing if " | |
"the update flag is not specified.") | |
parser.add_argument("--output-path", | |
required=True, | |
help="Local or shared filepath to write the " | |
"extracted features to.") | |
parser.add_argument("--job-info-parent", | |
required=True, | |
help="Project on Synapse to store the created job " | |
"table and reduce job file within.") | |
parser.add_argument("--assay-name", | |
required=True, | |
help="Name of assay to be processed.") | |
parser.add_argument("--assay-column", | |
required=True, | |
help="Name of column in input-table containing " | |
"the raw data for assay-name.") | |
parser.add_argument("--activity-name", | |
default=DUMMY_ARG, | |
help="Name of the activity (tremor, tapping, ...). " | |
"Ignored when --update is set. Required otherwise.") | |
parser.add_argument("--chunk-size", | |
default=30, | |
type=int, | |
help="Number of record IDs to process per job.") | |
parser.add_argument("--output-synapse", | |
default=DUMMY_ARG, | |
help="Synapse ID of the parent entity where" | |
"concatenated features will be stored. Ignored" | |
"when --update is set.") | |
parser.add_argument("--update", | |
default=DUMMY_ARG, | |
help="The Synapse ID of a feature file " | |
"containing a recordId column. Any preexisting " | |
"record ID values will not be submitted for " | |
"feature extraction. Currently this requires " | |
"downloading the entire feature file.") | |
parser.add_argument("--file-format", | |
default="tsv", | |
help="The format of both the feature file being " | |
"updated and/or the expected output. Recognized " | |
"arguments are 'tsv' and 'csv'. " | |
"If the argument isn't recognized, it will be " | |
"interpreted as the literal delimiter to use. " | |
"Unfortunately, this is very sensitive to " | |
"formatting when it comes to special characters " | |
"(like \\t) because of the need to pass this " | |
"special character through multiple interpreters " | |
"(e.g., bash, python). So we highly recommend " | |
"using a more common format, such as 'csv' " | |
"or 'tsv'.") | |
parser.add_argument("--used", nargs='+', help="For provenance.") | |
parser.add_argument("--executed", nargs='+', help="For provenance.") | |
parser.add_argument("--verify-integrity", action="store_const", const=True, | |
help="Drop rows that are already included in the " | |
"Synapse feature file when --update is specified. " | |
"By default, it checks for exact duplicates, " | |
"which may be problematic when dealing with " | |
"floating point numbers and/or large files. Set " | |
"the --columns-to-verify flag to specify a specific " | |
"set of columns that index the feature file " | |
"for faster duplicate checking. By default, only " | |
"the last duplicate is kept.") | |
parser.add_argument("--columns-to-verify", nargs='+', | |
help="A list of column names in the feature file " | |
"which index the features. These are used to " | |
"efficiently drop duplicate rows when updating " | |
"a feature file.") | |
parser.add_argument("--profile", | |
help="The name of the AWS CLI profile to " | |
"use when submitting Batch jobs.") | |
parser.add_argument("--map-job-queue", | |
default="mpower-feature-extraction", | |
help="The Batch job queue to submit jobs to.") | |
parser.add_argument("--map-job-definition", | |
default="mpower-feature-extraction", | |
help="The Batch job definition to use for " | |
"each job.") | |
parser.add_argument("--reduce-job-queue", | |
default="mpower-feature-extraction", | |
help="The Batch job queue to submit the " | |
"reduce job to.") | |
parser.add_argument("--reduce-job-definition", | |
default="mpower-feature-reduction", | |
help="The Batch job definition to use for " | |
"the reduce features job.") | |
args = parser.parse_args() | |
return args | |
def verify_args(args): | |
if args.update is DUMMY_ARG and args.activity_name is DUMMY_ARG: | |
raise TypeError("The --activity-name flag must be set if " | |
"the --update flag is not set.") | |
if args.update != DUMMY_ARG and args.output_synapse != DUMMY_ARG: | |
raise TypeError("The --update and --output-synapse flags " | |
"cannot both be set.") | |
if args.verify_integrity is None and args.columns_to_verify: | |
raise RuntimeError("--columns-to-verify is specified but " | |
"--verify-integrity is not. Did you mean to " | |
"include --verify-integrity in the command " | |
"line arguments?") | |
def get_batch_client(profile): | |
session = boto3.session.Session(profile_name=profile) | |
batch_client = session.client("batch") | |
return batch_client | |
def get_filtered_table(syn, input_table, assay_column): | |
q = syn.tableQuery("SELECT recordId FROM {} WHERE \"{}\" IS NOT NULL".format( | |
input_table, assay_column)) | |
table_df = q.asDataFrame() | |
return table_df | |
def get_all_record_ids(syn, input_table, assay_column): | |
table_df = get_filtered_table(syn, input_table, assay_column) | |
record_ids = table_df.recordId.values | |
return record_ids | |
def get_sep(file_format): | |
if file_format == "csv": | |
return "," | |
elif file_format == "tsv": | |
return "\t" | |
else: | |
return file_format | |
def get_diff(syn, input_table, update, file_format, assay_column): | |
all_record_ids = get_all_record_ids(syn, input_table, assay_column) | |
features = syn.get(update) | |
features_df = pandas.read_csv(features.path, | |
sep=get_sep(file_format), | |
index_col=False, | |
header=0, | |
usecols=['recordId']) | |
already_processed = features_df.recordId.values | |
diff_record_ids = list(set(all_record_ids).difference(already_processed)) | |
return diff_record_ids | |
def chunk_up_record_ids(record_ids, chunk_size): | |
chunks = [] | |
for i in range(0, len(record_ids), chunk_size): | |
record_ids_subset = record_ids[i:(i+chunk_size)] | |
chunks.append(record_ids_subset) | |
if len(chunks) > AWS_BATCH_ARRAY_SIZE_LIMIT: | |
min_chunk_size = len(chunks) // AWS_BATCH_ARRAY_SIZE_LIMIT + 1 | |
raise RuntimeError("Minimum chunk size is {}".format(min_chunk_size)) | |
return chunks, len(chunks) | |
def build_job_table(syn, record_id_chunks, job_table_parent): | |
job_table = [] | |
for i in range(0, len(record_id_chunks)): | |
json_dict = {'recordIds': list(record_id_chunks[i])} | |
json_dict_str = json.dumps(json_dict) | |
job_table.append([i, json_dict_str]) | |
job_table = pandas.DataFrame(job_table, columns=['uid', 'jobInfo']) | |
cols = synapseclient.as_table_columns(job_table) | |
for col in cols: | |
if col['name'] == 'uid': col['maximumSize'] = 36 | |
if col['name'] == 'jobInfo': col['columnType'] = 'LARGETEXT' | |
schema = synapseclient.Schema( | |
name="batch-job-submission-{}".format(uuid.uuid4()), | |
columns=cols, | |
parent=job_table_parent) | |
table = syn.store(synapseclient.Table(schema, job_table)) | |
return table.schema.id | |
def submit_map_job(batch_client, job_queue, job_definition, | |
array_job_size, job_table, input_table, | |
assay_name, assay_column, output_path): | |
job = batch_client.submit_job( | |
jobName=job_table, | |
jobQueue=job_queue, | |
jobDefinition=job_definition, | |
arrayProperties={'size': array_job_size}, | |
parameters={ | |
"jobTable": job_table, | |
"inputTable": input_table, | |
"assayName": assay_name, | |
"assayColumn": assay_column, | |
"outputPath": output_path}) | |
return job['jobId'] | |
def build_reduce_file(syn, reduce_file_parent, used, executed, | |
verify_integrity, verify_columns, dependency): | |
used = list(USED.union(used)) if used else list(USED) | |
executed = list(EXECUTED.union(executed)) if executed else list(EXECUTED) | |
json_dict = { | |
'used': used, | |
'executed': executed, | |
'verify_integrity': verify_integrity, | |
'verify_columns': verify_columns} | |
json_dict_str = json.dumps(json_dict) | |
temp_local_reduce_fname = "{}.json".format(dependency) | |
with open(temp_local_reduce_fname, "w") as f: | |
print(json_dict_str, file=f) | |
f = synapseclient.File(temp_local_reduce_fname, parent=reduce_file_parent) | |
reduce_job_file = syn.store(f) | |
os.system("rm {}".format(temp_local_reduce_fname)) | |
return reduce_job_file.id | |
def submit_reduce_job(batch_client, dependency, reduce_job_queue, | |
reduce_job_definition, input_path, file_format, | |
activity, assay, output, features_to_update, | |
reduce_file, job_table): | |
"""Submit a job to AWS Batch which depends on the array job | |
submitted to extract the features. | |
Parameters | |
---------- | |
batch_client : botocore.client.Batch | |
dependency : str | |
Job ID of array job submitted to extract features. | |
reduce_job_queue : str | |
AWS Batch queue to submit the reduce job to. | |
reduce_job_definition : str | |
AWS Batch job definition to use for reduce job. | |
input_path : str | |
Path to where both the extracted features will be written, | |
and where their concatenation will be written to when | |
`features_to_update` is None. Otherwise the features will | |
be appended to the path where `features_to_update` is saved. | |
file_format : str | |
Format of or delimiter of `features_to_update` (if specified), | |
and the extracted features. | |
activity : str | |
Name of the activity (tremor, tapping, ...). Not used when | |
`features_to_update` is specified. | |
assay : str | |
Name of the assay. Also the subdirectory within `input_path` | |
containing the extracted features. | |
output : str | |
Synapse ID to store feature file at. | |
features_to_update : str | |
Synapse ID of preexisting feature file to append extracted | |
features to. | |
reduce_file : str | |
Synapse ID of json file containing reduce job info. | |
job_table : str | |
Synapse ID of job table. Will be deleted. | |
""" | |
reduce_job = batch_client.submit_job( | |
jobName="reduce_{}".format(dependency), | |
jobQueue=reduce_job_queue, | |
jobDefinition=reduce_job_definition, | |
dependsOn=[{'jobId': dependency, 'type': 'SEQUENTIAL'}], | |
parameters={ | |
"output": output, | |
"inputPath": input_path, | |
"reduceJobFile": reduce_file, | |
"activity": activity, | |
"assay": assay, | |
"update": features_to_update, | |
"jobTable": job_table, | |
"fileFormat": file_format}) | |
def main(): | |
args = read_args() | |
verify_args(args) | |
syn = synapseclient.login() | |
batch_client = get_batch_client(args.profile) | |
if args.update is DUMMY_ARG: | |
record_ids = get_all_record_ids(syn, args.input_table, args.assay_column) | |
else: | |
record_ids = get_diff(syn, args.input_table, args.update, | |
args.file_format, args.assay_column) | |
if len(record_ids): | |
record_id_chunks, array_job_size = chunk_up_record_ids( | |
record_ids, args.chunk_size) | |
job_table = build_job_table( | |
syn, record_id_chunks, args.job_info_parent) | |
map_job_id = submit_map_job(batch_client, args.map_job_queue, | |
args.map_job_definition, array_job_size, | |
job_table, args.input_table, args.assay_name, | |
args.assay_column, args.output_path) | |
reduce_file = build_reduce_file(syn, args.job_info_parent, args.used, | |
args.executed, args.verify_integrity, | |
args.columns_to_verify, map_job_id) | |
submit_reduce_job(batch_client, map_job_id, args.reduce_job_queue, | |
args.reduce_job_definition, args.output_path, | |
args.file_format, args.activity_name, args.assay_name, | |
args.output_synapse, args.update, reduce_file, | |
job_table) | |
else: | |
print("No new records in column {} " | |
"to process.".format(args.assay_column)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment