Skip to content

Instantly share code, notes, and snippets.

@ntakouris
Created July 14, 2020 15:35
Show Gist options
  • Save ntakouris/86cb06713a44186147c8e7b5050c36a3 to your computer and use it in GitHub Desktop.
Save ntakouris/86cb06713a44186147c8e7b5050c36a3 to your computer and use it in GitHub Desktop.
def create_pipeline():
no_eval_config = example_gen_pb2.Input(splits=[ # treat the entire input as a train split
example_gen_pb2.Input.Split(name='train', pattern='taxi_dataset.csv'),
])
example_gen = CsvExampleGen(input=external_input(
'gs://<bucket name>/'), input_config=no_eval_config)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])
return pipeline.Pipeline(
pipeline_name='<pipeline name>',
pipeline_root='gs://<bucket name>',
components=[example_gen, statistics_gen, schema_gen],
beam_pipeline_args=DATAFLOW_BEAM_PIPELINE_ARGS,
metadata_connection_config=metadata.mysql_metadata_connection_config(
host="<mysql ip>", database="<your database name>", port=<the port>, username='<...>', password='****')
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment