Skip to content

Instantly share code, notes, and snippets.

Created January 5, 2017 20:33
Show Gist options
  • Save ankurcha/c0719d9caa7cda1dbfce0bb696187a66 to your computer and use it in GitHub Desktop.
Save ankurcha/c0719d9caa7cda1dbfce0bb696187a66 to your computer and use it in GitHub Desktop.
Refreshing PCollectionView by polling view registry
// view creation
PCollectionView<List<MaterializedViewGenerator>> viewGenerators = input
.apply(new ViewRegistryPollerTransform(opts, windowDuration))
// view usage as side input
input.apply(name, ParDo.of(new ViewMaterializerDoFn(viewGenerators)).withSideInputs(viewGenerators));
public class ViewRegistryPollerTransform extends PTransform<PCollection<Row>, PCollection<MaterializedViewGenerator>> {
// configuration
private AnalyticsPipelineOptions opts;
private Duration bucketDuration;
public ViewRegistryPollerTransform(AnalyticsPipelineOptions opts, Duration bucketDuration) {
this.opts = opts;
this.bucketDuration = bucketDuration;
public PCollection<MaterializedViewGenerator> apply(PCollection<Row> input) {
// prepare ticks to refresh the view periodically
PCollection<Long> ticks = input
// Produce 1 "tick" per second
.apply("UnboundedTicks", CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-hr windows
.apply("1HrWindow", Window.into(FixedWindows.of(Duration.standardHours(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
return ticks.apply("PollViewRegistryService",
ParDo.of(new ListMaterializedViewRegistryFn(this.opts, this.bucketDuration)));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment