Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Created February 3, 2019 12:41
Show Gist options
  • Save brachi-wernick/b1a4ac400bf094142ce6f49b281f5a9a to your computer and use it in GitHub Desktop.
Save brachi-wernick/b1a4ac400bf094142ce6f49b281f5a9a to your computer and use it in GitHub Desktop.
Side Input with time wiondows
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
String bucketName = options.getBigQuerySchemaConfigBucketName().get();
PCollectionView<FieldsConfiguration> sideView = ticks
.apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
.apply(View.asSingleton());
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
.apply("extract-events", ParDo.of(new EventsRowFn(sideView)).withSideInputs(sideView));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment