Skip to content

Instantly share code, notes, and snippets.

@andreinechaev
Created October 7, 2016 20:25
Show Gist options
  • Save andreinechaev/e6d62c31197001905a1c14c86bb966df to your computer and use it in GitHub Desktop.
Save andreinechaev/e6d62c31197001905a1c14c86bb966df to your computer and use it in GitHub Desktop.
Akka Streams Example
@Test
public void testToTest() throws Exception {
String allInOne = "AllInOne";
sBroker.createTopic(allInOne, 1, 1);
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, allInOne);
source.foreach((key, value) -> System.out.println(value));
KafkaStreams streams = new KafkaStreams(builder, props);
streams.setUncaughtExceptionHandler((t, e) -> {
System.out.println(e.getMessage());
});
streams.start();
Properties producerProps = ServiceHelper.loadProperties("producer.props");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new Producer<>(producerProps);
IntStream.range(0, 1000)
.forEach(i -> {
producer.send(new ProducerRecord<>(allInOne, "Msg", "Message-" + i));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment