Skip to content

Instantly share code, notes, and snippets.

@ftrossbach
Last active March 10, 2017 16:03
Show Gist options
  • Save ftrossbach/d3b235efd9d51373059d2b30c9eedc6d to your computer and use it in GitHub Desktop.
Save ftrossbach/d3b235efd9d51373059d2b30c9eedc6d to your computer and use it in GitHub Desktop.
A simplified Kafka Streams topology
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> visitsStream = builder.stream(Serdes.String(), Serdes.Long(), "visitsTopic");
KGroupedStream<String, Long> groupedStream = visitsStream.groupByKey();
KTable<String, Long> totalCount = groupedStream.count("totalVisitCount");
KTable<Windowed<String>, Long> windowedCount = groupedStream.count(TimeWindows.of(60 * 60 * 1000), "hourlyVisitCount");
groupedStream.count(SessionWindows.with(60 * 1000), "sessionVisitCount");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment