Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save therako/fc384abf5d5950128e0c0c1308c868f2 to your computer and use it in GitHub Desktop.
Save therako/fc384abf5d5950128e0c0c1308c868f2 to your computer and use it in GitHub Desktop.
Long batchSize = 10L;
pipeline
.apply("ReadItems", BigQueryIO.readTableRows().fromQuery(query))
.apply("ParseTableRow", ParDo.of(new ParseTableRow()))
.apply("ConcurrentQueueAggregate", ParDo.of(new DoFn<String, Iterable<String>>() {
private Queue<String> queue = new ConcurrentLinkedQueue<>();
@ProcessElement
void processElement(ProcessContext c) {
queue.add(c.element());
if(queue.size() == batchSize) {
c.output(queue);
queue.clear();
}
}
}))
.apply("BatchPredict", ParDo.of(new BatchPredict()))
.apply("StoreResults", TextIO.write());
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment