Skip to content

Instantly share code, notes, and snippets.

@kijanowski
Created July 8, 2019 08:39
Show Gist options
  • Save kijanowski/ec11de5c0d4b24a342bb16a7b39cdd78 to your computer and use it in GitHub Desktop.
Save kijanowski/ec11de5c0d4b24a342bb16a7b39cdd78 to your computer and use it in GitHub Desktop.
StreamsBuilder builder = new StreamsBuilder();
builder
.stream("inJlinkTopic", Consumed.with(Serdes.String(), Serdes.String()))
.peek((key, value) -> log.info("Received message: {}", value))
.filter((key, value) -> "PASS".equals(value))
.to("outJlinkTopic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment