Skip to content

Instantly share code, notes, and snippets.

@brachi-wernick
Created March 6, 2019 16:04
Show Gist options
  • Save brachi-wernick/22fa193dee15a2ccf92cc42ebe75b1d4 to your computer and use it in GitHub Desktop.
Save brachi-wernick/22fa193dee15a2ccf92cc42ebe75b1d4 to your computer and use it in GitHub Desktop.
SideInput load config
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
String bucketName = options.getBigQuerySchemaConfigBucketName().get();
PCollectionView<FieldsConfiguration> sideView = ticks
.apply(MapElements.into(TypeDescriptor.of(FieldsConfiguration.class)).via((Long ignored) -> getFieldsConfiguration(bucketName)))
.apply(View.<FieldsConfiguration>asSingleton().withDefaultValue(getFieldsConfiguration(bucketName)));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment