Skip to content

Instantly share code, notes, and snippets.

@saggie
Last active June 11, 2017 10:54
Show Gist options
  • Save saggie/77d9781e06f84e99915998fbdd9ec7ce to your computer and use it in GitHub Desktop.
Save saggie/77d9781e06f84e99915998fbdd9ec7ce to your computer and use it in GitHub Desktop.
A simple Kafka (sender and receiver) sample code in Java
package sample;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class Main {
public static void main(String[] args) {
new Thread(new InfiniteKafkaSender()).start();
new Thread(new InfiniteKafkaReceiver()).start();
}
private static class InfiniteKafkaSender implements Runnable {
private KafkaProducer<String, String> producer;
public InfiniteKafkaSender() {
Properties properties = new Properties() {{
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
}};
this.producer = new KafkaProducer<>(properties, new StringSerializer(),
new StringSerializer());
}
@Override
public void run() {
while (true) {
String message = new Date().toString();
producer.send(new ProducerRecord<>("my-topic", message));
System.out.println("Sent: " + message);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class InfiniteKafkaReceiver implements Runnable {
private KafkaConsumer<String, String> consumer;
public InfiniteKafkaReceiver() {
Properties properties = new Properties() {{
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
}};
consumer = new KafkaConsumer<>(properties,
new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Arrays.asList("my-topic"));
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000L);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received: " + record.value());
}
}
}
}
}
...
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment