Last active
February 1, 2019 18:49
-
-
Save seandavi/ad335626eb77797462c34c4402b2f3a9 to your computer and use it in GitHub Desktop.
simple dataflow pipeline from sra json
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requires python 2.7 | |
# pip install apache_beam | |
from __future__ import print_function | |
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions | |
import json | |
import argparse | |
import logging | |
import urllib2 | |
import urllib | |
logging.basicConfig(level=logging.INFO) | |
def annotate(text, api_key): | |
res = urllib2.urlopen('http://data.bioontology.org/annotator?text=%s&apikey=%s' % | |
(urllib.quote_plus(text), api_key)) | |
return json.load(res) | |
class AnnotateText(beam.DoFn): | |
def process(self, element): | |
print("element is: -----------------\n", element) | |
api_key = "SUPPLY_YOUR_OWN" | |
vals=[] | |
try: | |
vals = annotate(element[1]['abstract'], api_key) | |
except: | |
pass | |
el2 = element | |
el2[1].update({"ontologies":vals}) | |
return [el2] | |
#pipeline_options={} | |
#pipeline_options['runner'] = 'DirectRunner' | |
options = PipelineOptions() #.from_dictionary(pipeline_options) | |
class CSVtoDict(beam.DoFn): | |
"""Converts line into dictionary""" | |
def process(self, element): | |
ret = json.loads(element) | |
return [ret] | |
class dict_to_keyed_tuple(beam.DoFn): | |
def process(self, element, key_name): | |
ret = (element[key_name], element) | |
return [ret] | |
class d2c(beam.DoFn): | |
"""Converts line into dictionary""" | |
def process(self, element): | |
return [json.dumps(element)] | |
class merger(beam.DoFn): | |
"""Converts line into dictionary""" | |
def process(self, element): | |
try: | |
runs = element[1]['run'] | |
experiment = element[1]['experiment'][0] | |
expt_copy = experiment.copy() | |
expt_copy.update({"runs":runs}) | |
return [expt_copy] | |
except: | |
return [] | |
def main(known_args): | |
# with beam.Pipeline(options=options) as p: | |
# (p | 'Reading input file' >> beam.io.ReadFromText(known_args.study) | |
# | 'convert from json' >> beam.ParDo(CSVtoDict()) | |
# | 'convert to json' >> beam.ParDo(d2c()) | |
# | 'convert to txt' >> beam.io.WriteToText('abc.json')) | |
# with beam.Pipeline(options=options) as p: | |
# (p | 'Reading input file' >> beam.io.ReadFromText(known_args.study) | |
# | 'convert from json' >> beam.Map(json.loads) | |
# | 'convert to json' >> beam.Map(json.dumps) | |
# | 'convert to txt' >> beam.io.WriteToText('abc2.json')) | |
with beam.Pipeline(options=options) as p: | |
logging.info('reading study') | |
study = (p | 'Reading study input file' >> beam.io.ReadFromText(known_args.study) | |
| 'convert study from json' >> beam.Map(json.loads) | |
| 'convert study to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession')) | |
# (study | 'annotate study' >> beam.ParDo(AnnotateText()) | |
# | 'mapping' >> beam.Map(lambda x: json.dumps(x[1])) | |
# | 'output annotations' >> beam.io.WriteToText(known_args.output_study)) | |
logging.info('reading experiment') | |
experimentin = (p | 'Reading expt input file' >> beam.io.ReadFromText(known_args.expt) | |
| 'convert expt from json' >> beam.Map(json.loads)) | |
experiment = (experimentin | |
| 'convert expt to json' >> beam.ParDo(dict_to_keyed_tuple(), 'study_accession')) | |
logging.info('reading run') | |
run = (p | 'Reading run input file' >> beam.io.ReadFromText(known_args.runf) | |
| 'convert run from json' >> beam.Map(json.loads) | |
| 'convert run to json' >> beam.ParDo(dict_to_keyed_tuple(), 'experiment_accession')) | |
logging.info('reading sample') | |
sample = (p | 'Reading sample input file' >> beam.io.ReadFromText(known_args.sample) | |
| 'convert sample from json' >> beam.Map(json.loads) | |
| 'convert sample to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession')) | |
logging.info('results') | |
results = ({'experiments': experiment, 'study': study} | |
| "join" >> beam.CoGroupByKey() | |
| "back to json" >> beam.Map(lambda x: json.dumps(x[1], indent=3)) | |
| "print" >> beam.io.WriteToText(known_args.output)) | |
logging.info('results-2') | |
experiment2 = (experimentin | |
| 'convert expt2 to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession')) | |
results2 = ({'experiment': experiment2, 'run': run} | |
| "join runs to experiment" >> beam.CoGroupByKey() | |
| "merger" >> beam.ParDo(merger()) | |
| "back to json1234" >> beam.Map(lambda x: json.dumps(x, indent=3)) | |
| "write out experiment2" >> beam.io.WriteToText(known_args.output2)) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--study', | |
dest='study', | |
default='gs://dataflow-samples/shakespeare/kinglear.txt', | |
help='Input file to process.') | |
parser.add_argument('--expt', | |
dest='expt', | |
default='gs://dataflow-samples/shakespeare/kinglear.txt', | |
help='Input file to process.') | |
parser.add_argument('--runf', | |
dest='runf', | |
default='gs://dataflow-samples/shakespeare/kinglear.txt', | |
help='Input file to process.') | |
parser.add_argument('--sample', | |
dest='sample', | |
default='gs://dataflow-samples/shakespeare/kinglear.txt', | |
help='Input file to process.') | |
parser.add_argument('--output', | |
dest='output', | |
required=True, | |
help='Output file to write results to.') | |
parser.add_argument('--output_study', | |
dest='output_study', | |
required=True, | |
help='Output file to write results to.') | |
parser.add_argument('--output2', | |
dest='output2', | |
required=True, | |
help='Output file to write results to.') | |
known_args, pipeline_args = parser.parse_known_args() | |
main(known_args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment