Skip to content

Instantly share code, notes, and snippets.

@apurvam
Created February 3, 2025 20:20
Show Gist options
  • Save apurvam/50e7d774be1a26b887b1d9d4ef46d730 to your computer and use it in GitHub Desktop.
Save apurvam/50e7d774be1a26b887b1d9d4ef46d730 to your computer and use it in GitHub Desktop.
Kafka Streams Word Count Example
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
public class WordCount {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "responsive-word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// these configs make for snappier demos
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10_000);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1_000);
props.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 3);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> source = builder.stream("words");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("counts-store"))
.toStream()
.to("output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment