Skip to content

Instantly share code, notes, and snippets.

@imrenagi
Created May 29, 2018 00:53
Show Gist options
  • Save imrenagi/c287b3809cd65c9083d61f297c1d6221 to your computer and use it in GitHub Desktop.
Save imrenagi/c287b3809cd65c9083d61f297c1d6221 to your computer and use it in GitHub Desktop.
RSVPStreamProcessor
Pipeline p = Pipeline.create(dataflowOptions);
PCollection<String> rsvpInput = p.apply("pubsubReader", PubsubIO.readStrings().fromSubscription(options.getSubscriptionId()));
String gcsBucketPath = String.format("gs://%s", options.getOutputBucket());
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
rsvpInput.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext processContext) throws Exception {
processContext.outputWithTimestamp(processContext.element(),
new Instant());
}
}))
.apply("gcsWriterWindow", Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))))
.apply("gcsWriterExec", TextIO.write()
.to(String
.format("%s/rsvp-stream/%s/rsvp-", gcsBucketPath, dtfOut.print(new DateTime())))
.withSuffix(".json")
.withWindowedWrites()
.withNumShards(10)
);
rsvpInput.apply("rsvpParser", ParDo.of(new RsvpParserDoFn()))
.apply("rsvpGroupWindow", Window.<RSVP>into(FixedWindows.of(Duration.standardMinutes(15))))
.apply("rsvpKeyify",ParDo.of(new KeyedRSVPDoFn()))
.apply("rsvpGroupByKey", GroupByKey.<String, RSVP>create())
.apply("rsvpReducer", ParDo.of(new KeyedReducerDoFn<RSVP>()))
.apply("rsvpToBigtablePut", ParDo.of(new TimeWindowedCountPut()))
.apply("bigtableWrite", CloudBigtableIO.writeToTable(bigtableConfig));
p.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment