Skip to content

Instantly share code, notes, and snippets.

@darionyaphet
Last active May 11, 2016 13:31
Show Gist options
  • Save darionyaphet/b41aa2182316884de4ac06756c9f68ad to your computer and use it in GitHub Desktop.
Save darionyaphet/b41aa2182316884de4ac06756c9f68ad to your computer and use it in GitHub Desktop.
Kafka Producer & Consumer
package org.darion.yaphet.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
private static final String TOPIC = "topic.default";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "group.streaming");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n",
record.offset(), record.key(), record.value());
}
}
}
}
package org.darion.yaphet.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
public class Producer {
private static final String PATH = "/usr/share/dict/words";
private static final String TOPIC = "topic.default";
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "3");
properties.setProperty("batch.size", "128");
properties.setProperty("linger.ms", "1");
properties.setProperty("buffer.memory", "33554432");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(properties);
for (String word : Files.readAllLines(Paths.get(PATH))) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "", word);
producer.send(record);
}
producer.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.darion.yaphet</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
</dependencies>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment