Skip to content

Instantly share code, notes, and snippets.

@therako
Last active May 4, 2018 09:11
Show Gist options
  • Save therako/70042cb286fa002fdc1b7f9d060a4987 to your computer and use it in GitHub Desktop.
Save therako/70042cb286fa002fdc1b7f9d060a4987 to your computer and use it in GitHub Desktop.
DataflowPipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args)
.as(DataflowPipelineOptions.class);
final Pipeline pipeline = org.apache.beam.sdk.Pipeline.create(pipelineOptions);
final String query = "SELECT id, score from [bigquery-public-data:hacker_news.stories]";
Long batchSize = 10L;
pipeline
.apply("ReadItems", BigQueryIO.readTableRows().fromQuery(query))
.apply("ParseTableRow", ParDo.of(new ParseTableRow()))
.apply("Window", Window.into(FixedWindows.of(Duration.standardSeconds(10))))
.apply("CombineItems", Combine.globally(new ToList<>()))
.apply("AggregateToBatchOfN", ParDo.of(new AggregateToBatchOfN<>(batchSize)))
.apply("BatchPredict", ParDo.of(new BatchPredict()))
.apply("StoreResults", TextIO.write());
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment