Last active
November 30, 2020 16:23
-
-
Save thanoojgithub/3b486a07cac4f597f738e83fbc8fe0cc to your computer and use it in GitHub Desktop.
Kafka SampleProducer in java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties | |
$ bin/kafka-server-start.sh config/server.properties | |
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 | |
Created topic quickstart-events. | |
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 | |
Topic: quickstart-events PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 | |
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0 | |
#$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 | |
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 | |
0 | |
1 | |
2 | |
3 | |
4 | |
5 | |
6 | |
7 | |
8 | |
9 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kafkaconnectone; | |
import java.util.Map.Entry; | |
import java.util.Properties; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.KafkaException; | |
import org.apache.kafka.common.errors.AuthorizationException; | |
import org.apache.kafka.common.errors.OutOfOrderSequenceException; | |
import org.apache.kafka.common.errors.ProducerFencedException; | |
/** | |
* --topic quickstart-events | |
* | |
*/ | |
public class SampleProducer { | |
public static void main(String[] args) { | |
if (args.length == 0) { | |
System.out.println("Enter topic name"); | |
System.exit(1); | |
} | |
String topicName = args[0].toString(); | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "localhost:9092"); | |
// Set acknowledgments for producer requests. | |
props.put("acks", "all"); | |
props.put("enable.idempotence", true); | |
// If the request fails, the producer can automatically retry, | |
props.put("retries", 1); | |
// Specify buffer size in configuration | |
props.put("batch.size", 16384); | |
// Reduce the no of requests less than 0 | |
props.put("linger.ms", 1); | |
// The buffer.memory controls the total amount of memory available to the | |
// producer for buffering. | |
props.put("buffer.memory", 33554432); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("transactional.id", "my-transactional-id"); | |
Producer<String, String> producer = new KafkaProducer<String, String>(props); | |
System.out.println("SampleProducer topic|" + topicName + "|"); | |
for (Entry<Object, Object> entrySet : props.entrySet()) { | |
System.out.println(entrySet.getKey() + " : " + entrySet.getValue()); | |
} | |
producer.initTransactions(); | |
try { | |
for (int i = 0; i < 10; i++) { | |
producer.beginTransaction(); | |
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); | |
producer.commitTransaction(); | |
System.out.println(i + " Message sent successfully"); | |
} | |
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { | |
// We can't recover from these exceptions, so our only option is to close the | |
// producer and exit. | |
e.printStackTrace(); | |
} catch (KafkaException e) { | |
// For all other exceptions, just abort the transaction and try again. | |
e.printStackTrace(); | |
producer.abortTransaction(); | |
} finally { | |
producer.close(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.kafkaconnect</groupId> | |
<artifactId>kafkaconnectone</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>kafka connect one</name> | |
<description>kafka connect one</description> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-clients</artifactId> | |
<version>2.6.0</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
<version>2.11.3</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-simple</artifactId> | |
<version>1.7.30</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.8.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<version>3.3.0</version> | |
<executions> | |
<execution> | |
<phase>package</phase> | |
<goals> | |
<goal>single</goal> | |
</goals> | |
</execution> | |
</executions> | |
<configuration> | |
<descriptorRefs> | |
<descriptorRef>jar-with-dependencies</descriptorRef> | |
</descriptorRefs> | |
<archive> | |
<manifest> | |
<mainClass>com.kafkaconnectone.SampleProducer</mainClass> | |
</manifest> | |
</archive> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ java -jar ./target/kafkaconnectone-0.0.1-SNAPSHOT-jar-with-dependencies.jar quickstart-events | |
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". | |
SLF4J: Defaulting to no-operation (NOP) logger implementation | |
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. | |
SampleProducer topic|quickstart-events| | |
transactional.id : my-transactional-id | |
bootstrap.servers : localhost:9092 | |
value.serializer : org.apache.kafka.common.serialization.StringSerializer | |
buffer.memory : 33554432 | |
retries : 1 | |
key.serializer : org.apache.kafka.common.serialization.StringSerializer | |
linger.ms : 1 | |
batch.size : 16384 | |
enable.idempotence : true | |
acks : all | |
0 Message sent successfully | |
1 Message sent successfully | |
2 Message sent successfully | |
3 Message sent successfully | |
4 Message sent successfully | |
5 Message sent successfully | |
6 Message sent successfully | |
7 Message sent successfully | |
8 Message sent successfully | |
9 Message sent successfully | |
hduser@thanoojubuntu-Inspiron-3521:~/eclipse-workspace/WSOne/kafkaconnectone$ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment