Skip to content

Instantly share code, notes, and snippets.

@ronaldpetty
Created October 4, 2023 14:26
Show Gist options
  • Save ronaldpetty/c235583e5807d0d385f547503f8b7fc9 to your computer and use it in GitHub Desktop.
Save ronaldpetty/c235583e5807d0d385f547503f8b7fc9 to your computer and use it in GitHub Desktop.
Kafka client sample
~$ sudo apt-get update && sudo apt install openjdk-17-jdk-headless maven -y

~$ mkdir -p ~/producer-placement/src/main/java/com/rx-m/kafka

~$ cd ~/producer-placement

Per https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

ProducerRecord(String topic, V value) ProducerRecord(String topic, K key, V value) ProducerRecord(String topic, Integer partition, K key, V value) ProducerRecord(String topic, Integer partition, K key, V value, Iterable

headers) ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers)

Code

~/producer-placement$ cat ./src/main/java/com/rx-m/kafka/BasicKafkaProducer.java

import java.util.Properties;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;


public final class BasicKafkaProducer {

    private BasicKafkaProducer() {}

    public static void main(String[] args) throws Exception {

        String topicName = "BasicKafkaProducerTopic";
        String key = "Key1";
        String value = "Value-1";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", StringSerializer.class.getName());
        props.put("value.serializer", StringSerializer.class.getName());
        props.put("linger.ms", 0);

        Producer < String, String > producer = new KafkaProducer < > (props);

        for (int i = 0; i < 2; i++) {
            String round = ":" + String.valueOf(i);
            // 1 - hash on value (round robin-ish)
            ProducerRecord<String,String> withoutKeyRecord = new ProducerRecord<>(topicName, "#1" + round);

            // 2 - hash on key (goes to same parition) -- also compaction / statefule streams like ktables affected
            ProducerRecord<String,String> withKeyRecord = new ProducerRecord<>(topicName, key, "#2" + round);

            // 3 - setting parition (if parition and key set which wins -- should be parition)
            ProducerRecord<String,String>withPartitionRecord = new ProducerRecord<>(topicName, 1, key, "#3" + round);

            // 4 - same as 3 adding header (no short form so use either parition id, key, or value)
            List<Header>headers = Arrays.asList(new RecordHeader("four", "4".getBytes()));
            ProducerRecord<String,String> withPartitionRecordAndHeaders = new ProducerRecord<String,String>(topicName, 0, key, "#4" + round, headers);

            // 5 - adding timestamp (affects stream windowing, maybe others as well?) 
            // System.currentTimeMillis() is default value
            ProducerRecord<String,String> withPartitionAndTimestampRecord = new ProducerRecord<String,String>(topicName, 0, 0L, key, "#5" + round);

            // 6 - same as 5 with headers
            List<Header> headers2 = Arrays.asList(new RecordHeader("sixth", "6".getBytes()), new RecordHeader("sixth-2", "6-2".getBytes()));
            ProducerRecord<String,String> withPartitionAndTimestampRecordAndHeaders = new ProducerRecord<>(topicName, 0, 0L, key, "#6" + round, headers2);
	        withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3".getBytes());
	        withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3 dupe".getBytes());

            producer.send(withoutKeyRecord);
            producer.send(withKeyRecord);
            producer.send(withPartitionRecord);
            producer.send(withPartitionRecordAndHeaders);
            producer.send(withPartitionAndTimestampRecord);
            producer.send(withPartitionAndTimestampRecordAndHeaders);
        }

        producer.close();

        System.out.println("BasicKafkaProducer sent the payload to Kafka!");

        AdminClient admin = AdminClient.create(props);

        DescribeTopicsResult result = admin.describeTopics(Arrays.asList(topicName));
        Map < String, KafkaFuture < TopicDescription >> values = result.values();
        KafkaFuture < TopicDescription > topicDescription = values.get(topicName);
        int paritions = topicDescription.get().partitions().size();
        System.out.println(paritions);
    }
}

~/producer-placement$

pom.xml

~/producer-placement$ cat pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rx-m.kafka</groupId>
    <artifactId>kafka-basic-producer</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>2.0.9</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>2.0.9</version>
    </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                  <source>1.8</source>
                  <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                    <goal>shade</goal>
                    </goals>
                    <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass>org.rx-m.kafka.BasicKafkaProducer</mainClass>
                        </transformer>
                    </transformers>
                    <filters>
                    <filter>
                        <artifact>*</artifact>
                        <excludes>
                            <exclude>META-INF/*.MF</exclude>
                        </excludes>
                    </filter>
                    </filters>
                    </configuration>
                </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

~/producer-placement$

Compile and run

~/producer-placement$ mvn clean compile package

~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic BasicKafkaProducerTopic

~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic BasicKafkaProducerTopic --partitions 2

~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic BasicKafkaProducerTopic

~/producer-placement$ java -cp ./target/kafka-basic-producer-1.0.jar BasicKafkaProducer

~/producer-placement$ ~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic BasicKafkaProducerTopic

~/producer-placement$ # ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning

~/producer-placement$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning --property print.headers=true --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.value=true --partition 0

~/producer-placement$ ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BasicKafkaProducerTopic --from-beginning --property print.headers=true --property print.timestamp=true --property print.key=true --property print.offset=true --property print.partition=true --property print.value=true --partition 1

~/kafka/bin/kafka-dump-log.sh --print-data-log --files /tmp/kraft-combined-logs/BasicKafkaProducerTopic-0/00000000000000000000.log 

~/kafka/bin/kafka-dump-log.sh --print-data-log --files /tmp/kraft-combined-logs/BasicKafkaProducerTopic-1/00000000000000000000.log

Notes

  • linger is used to force more "round-robin" like behavior
  • You can add a header without partition by using the headers interface via the ProducerRecord without the partition
withPartitionAndTimestampRecordAndHeaders.headers().add("sixth-3", "6-3".getBytes());

or you can set "null" for partition value (takes Integer object).

~/producer-placement$ cat ./src/main/java/com/rx-m/kafka/BasicKafkaConsumer.java 
import java.util.Arrays;
import java.time.Duration;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.Header;

public final class BasicKafkaConsumer {

    private BasicKafkaConsumer() {
    }

    /** Poll duration used by the consumer when retrieving records.*/
    private static final int POLL_DURATION_MS = 1000;

    /** Primary code for the consumer.*/
    public static void main(String[] args) throws Exception {

        String topicName = "BasicKafkaProducerTopic";
        String groupName = "BasicKafkaTopicGroup";

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("group.id", groupName);

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));

	consumer.poll(0);
	consumer.seekToBeginning(consumer.assignment()); // move to front, else at end

        while (true) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(
                Duration.ofMillis(POLL_DURATION_MS)
            );
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.printf(
                    "Received Message value = %s, with ts: %d\n", record.value(), record.timestamp()
                );
		Headers headers = record.headers();
		Header[] _headers = headers.toArray();

		if(_headers.length > 0) {
			for(int i = 0; i < _headers.length; i++) {
				System.out.println(_headers[i].key() + ":" + new String(_headers[i].value()));
			}
		}
            }
	    //consumer.commitSync(); // Add this line
        }
    }
}

~/producer-placement$

Same POM.

~/producer-placement$ mvn clean compile package

Run consumer (prints headers, timestamps, etc.).

~/producer-placement$ java -cp ./target/kafka-basic-producer-1.0.jar BasicKafkaConsumer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment