~$ sudo apt-get update && sudo apt install openjdk-17-jdk-headless maven -y
~$ mkdir -p ~/producer-placement/src/main/java/com/rx-m/kafka
~$ cd ~/producer-placement
Per https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
ProducerRecord(String topic, V value) ProducerRecord(String topic, K key, V value) ProducerRecord(String topic, Integer partition, K key, V value) ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers) ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers)Code
~/producer-placement$ cat ./src/main/java/com/rx-m/kafka/BasicKafkaProducer.java
import java.util.Properties;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
public final class BasicKafkaProducer {
private BasicKafkaProducer() {}
public static void main(String[] args) throws Exception {
String topicName = "BasicKafkaProducerTopic";
String key = "Key1";
String value = "Value-1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("linger.ms", 0);
Producer < String, String > producer = new KafkaProducer < > (props);
for (int i = 0; i < 2; i++) {
String round = ":" + String.valueOf(i);
// 1 - hash on value (round robin-ish)
ProducerRecord<String,String> withoutKeyRecord = new ProducerRecord<>(topicName, "#1" + round);
// 2 - hash on key (goes to same parition) -- also compaction / statefule streams like ktables affected
ProducerRecord<String,String> withKeyRecord = new ProducerRecord<>(topicName, key, "#2" + round);
// 3 - setting parition (if parition and key set which wins -- should be parition)
ProducerRecord<String,String>withPartitionRecord = new ProducerRecord<>(topicName, 1, key, "#3" + round);
// 4 - same as 3 adding header (no short form so use either parition id, key, or value)
List<Header>headers = Arrays.asList(new RecordHeader("four", "4".getBytes()));
ProducerRecord<String,String> withPartitionRecordAndHeaders = new ProducerRecord<String,String>(topicName, 0, key, "#4" + round, headers);
// 5 - adding timestamp (affects stream windowing, maybe others as well?)
// System.currentTimeMillis() is default value
ProducerRecord<String,String> withPartitionAndTimestampRecord = new ProducerRecord<String,String>(topicName, 0, 0L, key, "#5" + round);
// 6 - same as 5 with headers
List<Header> headers2 = Arrays.asList(new RecordHeader("sixth", "6".getBytes()), new RecordHeader("sixth-2", "6-2".getBytes()));
ProducerRecord<String,String> withPartitionAndTimestampRecordAndHeaders = new ProducerRecord<>(topicName, 0, 0L, key, "#6" + round, headers2);
withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3".getBytes());
withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3 dupe".getBytes());
producer.send(withoutKeyRecord);
producer.send(withKeyRecord);
producer.send(withPartitionRecord);
producer.send(withPartitionRecordAndHeaders);
producer.send(withPartitionAndTimestampRecord);
producer.send(withPartitionAndTimestampRecordAndHeaders);
}
producer.close();
System.out.println("BasicKafkaProducer sent the payload to Kafka!");
AdminClient admin = AdminClient.create(props);
DescribeTopicsResult result = admin.describeTopics(Arrays.asList(topicName));
Map < String, KafkaFuture < TopicDescription >> values = result.values();
KafkaFuture < TopicDescription > topicDescription = values.get(topicName);
int paritions = topicDescription.get().partitions().size();
System.out.println(paritions);
}
}
~/producer-placement$
pom.xml
~/producer-placement$ cat pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rx-m.kafka</groupId>
<artifactId>kafka-basic-producer</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.rx-m.kafka.BasicKafkaProducer</mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*</artifact>
<excludes>
<exclude>META-INF/*.MF</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
~/producer-placement$
Compile and run
~/producer-placement$ mvn clean compile package
~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic BasicKafkaProducerTopic
~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic BasicKafkaProducerTopic --partitions 2
~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic BasicKafkaProducerTopic
~/producer-placement$ java -cp ./target/kafka-basic-producer-1.0.jar BasicKafkaProducer
~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic BasicKafkaProducerTopic
~/producer-placement$ # ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning
~/producer-placement$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning --property print.headers=true --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.value=true --partition 0
~/producer-placement$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning --property print.headers=true --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.value=true --partition 1
~/kafka/bin/kafka-dump-log.sh --print-data-log --files /tmp/kraft-combined-logs/BasicKafkaProducerTopic-0/00000000000000000000.log
~/kafka/bin/kafka-dump-log.sh --print-data-log --files /tmp/kraft-combined-logs/BasicKafkaProducerTopic-1/00000000000000000000.log
Notes
- linger is used to force more "round-robin" like behavior
- You can add a header without partition by using the headers interface via the ProducerRecord without the partition
withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3".getBytes());
or you can set "null" for partition value (takes Integer object).
~/producer-placement$ cat ./src/main/java/com/rx-m/kafka/BasicKafkaConsumer.java
import java.util.Arrays;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.Header;
public final class BasicKafkaConsumer {
private BasicKafkaConsumer() {
}
/** Poll duration used by the consumer when retrieving records.*/
private static final int POLL_DURATION_MS = 1000;
/** Primary code for the consumer.*/
public static void main(String[] args) throws Exception {
String topicName = "BasicKafkaProducerTopic";
String groupName = "BasicKafkaTopicGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", groupName);
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment()); // move to front, else at end
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(
Duration.ofMillis(POLL_DURATION_MS)
);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.printf(
"Received Message value = %s, with ts: %d\n", record.value(), record.timestamp()
);
Headers headers = record.headers();
Header[] _headers = headers.toArray();
if(_headers.length > 0) {
for(int i = 0; i < _headers.length; i++) {
System.out.println(_headers[i].key() + ":" + new String(_headers[i].value()));
}
}
}
//consumer.commitSync(); // Add this line
}
}
}
~/producer-placement$
Same POM.
~/producer-placement$ mvn clean compile package
Run consumer (prints headers, timestamps, etc.).
~/producer-placement$ java -cp ./target/kafka-basic-producer-1.0.jar BasicKafkaConsumer