Skip to content

Instantly share code, notes, and snippets.

@ntakouris
Created July 16, 2020 09:16
Show Gist options
  • Save ntakouris/7022787772cbdd3548c3851bd260d35e to your computer and use it in GitHub Desktop.
Save ntakouris/7022787772cbdd3548c3851bd260d35e to your computer and use it in GitHub Desktop.
PIPELINE_ROOT = '<your project root>/bucket' # pretend this is a storage bucket in the cloud
METADATA_STORE = f'{PIPELINE_ROOT}/metadata_store.db'
STAGING = 'staging'
TEMP = 'temp'
PROJECT_ID = ''
JOB_NAME = ''
DATASET_PATTERN = 'taxi_dataset.csv'
BEAM_ARGS = [
'--runner=DirectRunner'
]
def create_pipeline():
no_eval_config = example_gen_pb2.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern=DATASET_PATTERN),
])
example_gen = CsvExampleGen(input=external_input(
PIPELINE_ROOT), input_config=no_eval_config)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
return pipeline.Pipeline(
pipeline_name=f'Pipeline {JOB_NAME}',
pipeline_root=PIPELINE_ROOT,
components=[example_gen, statistics_gen, schema_gen],
beam_pipeline_args=BEAM_ARGS,
metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_STORE)
)
if __name__ == '__main__':
BeamDagRunner().run(create_pipeline())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment