Skip to content

Instantly share code, notes, and snippets.

@seandavi
Created January 29, 2019 22:21
Show Gist options
  • Save seandavi/7aea0ef12d77c498dd393fb0edff86e6 to your computer and use it in GitHub Desktop.
Save seandavi/7aea0ef12d77c498dd393fb0edff86e6 to your computer and use it in GitHub Desktop.
Read and process full files based on wildcard path using Apache Beam/Google Cloud Platform/DataFlow
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.filesystems import FileSystems
import urllib
import json
import argparse
import logging
logging.basicConfig(level=logging.INFO)
options = {}
options['runner']="DataflowRunner"
options['project']="PROJECT_NAME"
options['temp_location']="gs://BUCKET_PATH/TMP"
options['save_main_session'] = True
options = PipelineOptions().from_dictionary(options)
with beam.Pipeline(options=options) as p:
def check(s):
logging.info(s.path)
return(json.load(FileSystems.open(s.path)))
logging.info('reading study')
study = (p | "create" >> beam.Create(FileSystems.match(["gs://bigrna-cancerdatasci-org/results/9606/gencode/29/SRX**/lib_format_counts.json"])[0].metadata_list)
| 'convert study from json' >> beam.Map(lambda s: check(s)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment