Created
January 15, 2018 21:29
-
-
Save kleysonr/d76df87479cc884818ebe870d297d7e5 to your computer and use it in GitHub Desktop.
Java Kafka Producer/Consumer Sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample1; | |
import java.io.UnsupportedEncodingException; | |
import java.util.Arrays; | |
import java.util.Base64; | |
import java.util.Properties; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import com.google.gson.Gson; | |
class MsgKafka { | |
private String id; | |
private String timestamp; | |
private String data; | |
public String getId() { | |
return id; | |
} | |
public void setId(String id) { | |
this.id = id; | |
} | |
public String getTimestamp() { | |
return timestamp; | |
} | |
public void setTimestamp(String timestamp) { | |
this.timestamp = timestamp; | |
} | |
public String getData() { | |
return data; | |
} | |
public void setData(String data) { | |
this.data = data; | |
} | |
} | |
public class KafkaConsumerSample | |
{ | |
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException { | |
Properties properties = new Properties(); | |
properties.put("bootstrap.servers", "ipaddress:6667"); | |
properties.put("kafka.topic" , "my-topic"); | |
properties.put("compression.type" , "gzip"); | |
properties.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); | |
properties.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer"); | |
properties.put("max.partition.fetch.bytes", "2097152"); | |
properties.put("max.poll.records" , "500"); | |
properties.put("group.id" , "my-group"); | |
runMainLoop(args, properties); | |
} | |
static void runMainLoop(String[] args, Properties properties) throws InterruptedException, UnsupportedEncodingException { | |
// Create Kafka producer | |
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); | |
try { | |
consumer.subscribe(Arrays.asList(properties.getProperty("kafka.topic"))); | |
System.out.println("Subscribed to topic " + properties.getProperty("kafka.topic")); | |
while (true) | |
{ | |
ConsumerRecords<String, String> records = consumer.poll(100); | |
for (ConsumerRecord<String, String> record : records) | |
{ | |
System.out.printf("partition = %s, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), decodeMsg(record.value()).getData() ); | |
} | |
} | |
} | |
finally { | |
consumer.close(); | |
} | |
} | |
public static MsgKafka decodeMsg(String json) throws UnsupportedEncodingException { | |
Gson gson = new Gson(); | |
MsgKafka msg = gson.fromJson(json, MsgKafka.class); | |
byte[] encodedData = Base64.getDecoder().decode(msg.getData()); | |
msg.setData(new String(encodedData, "utf-8")); | |
return msg; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample1; | |
import java.io.UnsupportedEncodingException; | |
import java.sql.Timestamp; | |
import java.util.Base64; | |
import java.util.Properties; | |
import java.util.Random; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import com.google.gson.Gson; | |
import com.google.gson.JsonObject; | |
public class KafkaProducerSample | |
{ | |
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException { | |
Properties properties = new Properties(); | |
properties.put("bootstrap.servers", "ipaddress:6667"); | |
properties.put("acks" , "0"); | |
properties.put("retries" , "1"); | |
properties.put("batch.size" , "20971520"); | |
properties.put("linger.ms" , "33"); | |
properties.put("max.request.size" , "2097152"); | |
properties.put("compression.type" , "gzip"); | |
properties.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); | |
properties.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer"); | |
properties.put("kafka.topic" , "my-topic"); | |
runMainLoop(args, properties); | |
} | |
static void runMainLoop(String[] args, Properties properties) throws InterruptedException, UnsupportedEncodingException { | |
// Create Kafka producer | |
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); | |
try { | |
while(true) { | |
Thread.sleep(1000); | |
String id = "device-" + getRandomNumberInRange(1,5); | |
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), id, getMsg(id))); | |
} | |
} finally { | |
producer.close(); | |
} | |
} | |
public static String getMsg(String id) throws UnsupportedEncodingException { | |
Gson gson = new Gson(); | |
String timestamp = new Timestamp(System.currentTimeMillis()).toString(); | |
JsonObject obj = new JsonObject(); | |
obj.addProperty("id", id); | |
obj.addProperty("timestamp", timestamp); | |
obj.addProperty("data", Base64.getEncoder().encodeToString("this is my message data ...".getBytes("utf-8"))); | |
String json = gson.toJson(obj); | |
return json; | |
} | |
private static int getRandomNumberInRange(int min, int max) { | |
Random r = new Random(); | |
return r.ints(min, (max + 1)).findFirst().getAsInt(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>kafkasample</groupId> | |
<artifactId>producer-consumer</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<dependencies> | |
<!-- gson --> | |
<dependency> | |
<groupId>com.google.code.gson</groupId> | |
<artifactId>gson</artifactId> | |
<version>2.8.2</version> | |
</dependency> | |
<!-- kafka --> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka_2.11</artifactId> | |
<version>0.10.1.0</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<resources> | |
<resource> | |
<directory>${basedir}/src/main/resources</directory> | |
</resource> | |
</resources> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.6.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment