Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Last active November 30, 2020 16:23
Show Gist options
  • Save thanoojgithub/3b486a07cac4f597f738e83fbc8fe0cc to your computer and use it in GitHub Desktop.
Save thanoojgithub/3b486a07cac4f597f738e83fbc8fe0cc to your computer and use it in GitHub Desktop.
Kafka SampleProducer in java
$ zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Created topic quickstart-events.
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
#$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
0
1
2
3
4
5
6
7
8
9
package com.kafkaconnectone;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
/**
* --topic quickstart-events
*
*/
public class SampleProducer {
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("Enter topic name");
System.exit(1);
}
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// Set acknowledgments for producer requests.
props.put("acks", "all");
props.put("enable.idempotence", true);
// If the request fails, the producer can automatically retry,
props.put("retries", 1);
// Specify buffer size in configuration
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("SampleProducer topic|" + topicName + "|");
for (Entry<Object, Object> entrySet : props.entrySet()) {
System.out.println(entrySet.getKey() + " : " + entrySet.getValue());
}
producer.initTransactions();
try {
for (int i = 0; i < 10; i++) {
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
System.out.println(i + " Message sent successfully");
}
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the
// producer and exit.
e.printStackTrace();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
e.printStackTrace();
producer.abortTransaction();
} finally {
producer.close();
}
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafkaconnect</groupId>
<artifactId>kafkaconnectone</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka connect one</name>
<description>kafka connect one</description>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.kafkaconnectone.SampleProducer</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
$ java -jar ./target/kafkaconnectone-0.0.1-SNAPSHOT-jar-with-dependencies.jar quickstart-events
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SampleProducer topic|quickstart-events|
transactional.id : my-transactional-id
bootstrap.servers : localhost:9092
value.serializer : org.apache.kafka.common.serialization.StringSerializer
buffer.memory : 33554432
retries : 1
key.serializer : org.apache.kafka.common.serialization.StringSerializer
linger.ms : 1
batch.size : 16384
enable.idempotence : true
acks : all
0 Message sent successfully
1 Message sent successfully
2 Message sent successfully
3 Message sent successfully
4 Message sent successfully
5 Message sent successfully
6 Message sent successfully
7 Message sent successfully
8 Message sent successfully
9 Message sent successfully
hduser@thanoojubuntu-Inspiron-3521:~/eclipse-workspace/WSOne/kafkaconnectone$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment