Skip to content

Instantly share code, notes, and snippets.

@seandavi
Last active February 1, 2019 18:49
Show Gist options
  • Save seandavi/ad335626eb77797462c34c4402b2f3a9 to your computer and use it in GitHub Desktop.
Save seandavi/ad335626eb77797462c34c4402b2f3a9 to your computer and use it in GitHub Desktop.
simple dataflow pipeline from sra json
# requires python 2.7
# pip install apache_beam
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
import argparse
import logging
import urllib2
import urllib
logging.basicConfig(level=logging.INFO)
def annotate(text, api_key):
res = urllib2.urlopen('http://data.bioontology.org/annotator?text=%s&apikey=%s' %
(urllib.quote_plus(text), api_key))
return json.load(res)
class AnnotateText(beam.DoFn):
def process(self, element):
print("element is: -----------------\n", element)
api_key = "SUPPLY_YOUR_OWN"
vals=[]
try:
vals = annotate(element[1]['abstract'], api_key)
except:
pass
el2 = element
el2[1].update({"ontologies":vals})
return [el2]
#pipeline_options={}
#pipeline_options['runner'] = 'DirectRunner'
options = PipelineOptions() #.from_dictionary(pipeline_options)
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element):
ret = json.loads(element)
return [ret]
class dict_to_keyed_tuple(beam.DoFn):
def process(self, element, key_name):
ret = (element[key_name], element)
return [ret]
class d2c(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element):
return [json.dumps(element)]
class merger(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element):
try:
runs = element[1]['run']
experiment = element[1]['experiment'][0]
expt_copy = experiment.copy()
expt_copy.update({"runs":runs})
return [expt_copy]
except:
return []
def main(known_args):
# with beam.Pipeline(options=options) as p:
# (p | 'Reading input file' >> beam.io.ReadFromText(known_args.study)
# | 'convert from json' >> beam.ParDo(CSVtoDict())
# | 'convert to json' >> beam.ParDo(d2c())
# | 'convert to txt' >> beam.io.WriteToText('abc.json'))
# with beam.Pipeline(options=options) as p:
# (p | 'Reading input file' >> beam.io.ReadFromText(known_args.study)
# | 'convert from json' >> beam.Map(json.loads)
# | 'convert to json' >> beam.Map(json.dumps)
# | 'convert to txt' >> beam.io.WriteToText('abc2.json'))
with beam.Pipeline(options=options) as p:
logging.info('reading study')
study = (p | 'Reading study input file' >> beam.io.ReadFromText(known_args.study)
| 'convert study from json' >> beam.Map(json.loads)
| 'convert study to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession'))
# (study | 'annotate study' >> beam.ParDo(AnnotateText())
# | 'mapping' >> beam.Map(lambda x: json.dumps(x[1]))
# | 'output annotations' >> beam.io.WriteToText(known_args.output_study))
logging.info('reading experiment')
experimentin = (p | 'Reading expt input file' >> beam.io.ReadFromText(known_args.expt)
| 'convert expt from json' >> beam.Map(json.loads))
experiment = (experimentin
| 'convert expt to json' >> beam.ParDo(dict_to_keyed_tuple(), 'study_accession'))
logging.info('reading run')
run = (p | 'Reading run input file' >> beam.io.ReadFromText(known_args.runf)
| 'convert run from json' >> beam.Map(json.loads)
| 'convert run to json' >> beam.ParDo(dict_to_keyed_tuple(), 'experiment_accession'))
logging.info('reading sample')
sample = (p | 'Reading sample input file' >> beam.io.ReadFromText(known_args.sample)
| 'convert sample from json' >> beam.Map(json.loads)
| 'convert sample to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession'))
logging.info('results')
results = ({'experiments': experiment, 'study': study}
| "join" >> beam.CoGroupByKey()
| "back to json" >> beam.Map(lambda x: json.dumps(x[1], indent=3))
| "print" >> beam.io.WriteToText(known_args.output))
logging.info('results-2')
experiment2 = (experimentin
| 'convert expt2 to json' >> beam.ParDo(dict_to_keyed_tuple(), 'accession'))
results2 = ({'experiment': experiment2, 'run': run}
| "join runs to experiment" >> beam.CoGroupByKey()
| "merger" >> beam.ParDo(merger())
| "back to json1234" >> beam.Map(lambda x: json.dumps(x, indent=3))
| "write out experiment2" >> beam.io.WriteToText(known_args.output2))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--study',
dest='study',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--expt',
dest='expt',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--runf',
dest='runf',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--sample',
dest='sample',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
parser.add_argument('--output_study',
dest='output_study',
required=True,
help='Output file to write results to.')
parser.add_argument('--output2',
dest='output2',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args()
main(known_args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment