Skip to content

Instantly share code, notes, and snippets.

@dontstopbelieveing
Last active October 28, 2025 20:59
Show Gist options
  • Save dontstopbelieveing/385ab43b4616b87001355f5234473ff2 to your computer and use it in GitHub Desktop.
Save dontstopbelieveing/385ab43b4616b87001355f5234473ff2 to your computer and use it in GitHub Desktop.
Kafka

Chapter 2: Installing Kafka

Hardware Selection

  • Disk Throughput is important, SSD's can be a better choice

  • Disk Capacity determined by how many messages need to be retained at any time + 10% overhead

    • Total traffic for cluster can be balanced by having multiple partitions per topic
  • Memory CPU no special requirements

  • Network - one producer can have multiple consumers, this can cause inbound and outbound traffic imbalance

  • How many brokers?

    • determined by disk capacity, network bandwidth and replication factor
    • For example if each broker has storage 2 TB and cluster needs to retain 10TB data, we need 5 brokers, if RF is 1 we need 10 brokers, if one broker's bandwidth is at 80% we may need another broker etc.
  • Broker Configuration

    • same zookeeper.connect parameters and
    • unique broker.id
  • OS Tuning
    The following parameters are set in /etc/sysctl.conf

    Memory

    • vm.swappiness=1 # Kafka would like the swap to be as low as possible as it relies heaving on system page cache
    • vm.dirty_background_ratio < 10
    • vm.dirty_ratio between 60 and 80
    • current settings can be seen using cat /proc/vmstat

    File System

    • file system should be xfs, mount should have noatime option

    Networking

    • net.core.wmem_default and net.core.rmem_default 131072 (128 KB) send and receive buffer default size per socket
    • net.core.wmem_max and net.core.rmem_max 2097152 (2 MB) send and receive buffer maximum sizes
    • net.ipv4.tcp_wmem and net.ipv4.tcp_rmem send and receive buffer sizes for TCP sockets
    • net.ipv4.tcp_window_scaling = 1
    • net.ipv4.tcp_max_syn_backlog > 1024
    • net.core.netdev_max_backlog > 1000

Garbage Collection

  • MaxGCPauseMillis 200ms
  • InitiatingHeapOccupancyPercent 35

Chapter 3: Kafka producers: Writing messages to Kafka

Producer - writes data to Kafka Consumer - reads data from Kafka Some application can serve both roles

Producer Overview

  • We start producing messages by creating a ProducerRecord - this includes the topic we want to send record to and a value, optionally we can also specify a key / or partition
  • Producer will serialize key value object to ByteArray
  • Sends data to partitioner, if no partition is specified this assigns a partition
  • it adds data to a batch that will go to same topic and partition. Separate thread will write data to broker
  • when broker receives message if successful sends RecordMetadata object with topic, partition and offset else sends error

Constructing a producer

3 Mandatory properties

  • bootstrap.servers - List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster. Use at least 2 brokers
  • key.serializer - Name of a class that will be used to serialize the keys of the records we will produce to Kafka. key.serializer should be set to a name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array.
  • value.serializer - Name of a class that will be used to serialize the values of the records we will produce to Kafka.

Sample code

private Properties kafkaProps = new Properties(); 
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
  "org.apache.kafka.common.serialization.StringSerializer"); 
kafkaProps.put("value.serializer",
  "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps); 

3 main modes of operation

  • Fire and forget
  • Synchronous send - send() method returns a Future object and we use get() to wait on future object
  • Asynchronous send - call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker

Sending a message to Kafka

Simple implementation

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Precision Products",
	  "France"); 
try {
  producer.send(record); 
} catch (Exception e) {
	e.printStackTrace(); 
}

ProducerRecord constructors - we use one that needs (topic, key, value) The send() method returns a Java Future object with RecordMetadata

Sending synchronously

	new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
	producer.send(record).get(); 
} catch (Exception e) {
		e.printStackTrace(); 
}

Kafka has 2 types of errors

  • Retriable errors - once that can be resolved by sending the message again. Eg. connection error, no leader etc. you can configure number of auto retries with Kafka
  • Non Retriable errors - message size too large.

Sending asynchronously

private class DemoProducerCallback implements Callback { 
	@Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    	if (e != null) {
        	e.printStackTrace(); 
        }
    }
}

ProducerRecord<String, String> record =
	new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); 
producer.send(record, new DemoProducerCallback()); 

To use callbacks, you need a class that implements the org.apache.kafka. clients.producer.Callback interface, which has a single function — onCompletion().

Configuring Producers

  • acks

    • acks = 0 - fire and forget
    • acks = 1 - will wait for ack from leader
    • acks = all - ack from broker once all in sync replicas have the message
  • buffer.memory

  • compression.type

  • retries

    • retry.backoff.ms
  • batch.size

  • linger.ms

  • client.id

  • max.in.flight.requests.per.connection - setting this to 1 ensures that messages will be written to broker in the order they were sent, even with retries

  • timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms

  • max.block.ms

  • max.request.size

  • receive.buffer.bytes send.buffer.bytes

Serializers

Kafka needs a serializer because it transmits data over the network as byte arrays.

Types of Serializers -

  • Custom Serializer
  • Serializing using Apache Avro

Custom Serialized example

Consider a class customer

public class Customer {
	private int customerID;
	private String customerName;

	public Customer(int ID, String name) {
		this.customerID = ID;
		this.customerName = name;
	}

  public int getID() {
    return customerID;
  }

  public String getName() {
  	return customerName;
  }
}

Serializer for this

import org.apache.kafka.common.errors.SerializationException;

import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer> {

	@Override
  public void configure(Map configs, boolean isKey) {
  	// nothing to configure
  }

  @Override
  /**
  We are serializing Customer as:
  4 byte int representing customerId
  4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
  N bytes representing customerName in UTF-8
  */
  public byte[] serialize(String topic, Customer data) {
  	try {
		  byte[] serializedName;
		  int stringSize;
      if (data == null)
        return null;
      else {
				if (data.getName() != null) {
        	serializeName = data.getName().getBytes("UTF-8");
        	stringSize = serializedName.length;
				} else {
				“serializedName = new byte[0];
					stringSize = 0;
				}
		  }

      ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
      buffer.putInt(data.getID());
      buffer.putInt(stringSize);
      buffer.put(serializedName);

      return buffer.array();
	  } catch (Exception e) {
  	  throw new SerializationException("Error when serializing Customer to byte[] " + e);
    }
  }

  @Override
  public void close() {
	  // nothing to close
  }
}

Configuring a producer with this CustomerSerializer will allow you to define ProducerRecord<String, Customer>, and send Customer data and pass Customer objects directly to the producer.

But if any of the fields in Customer change we have to edit the serializer too.

For these reasons, we recommend using existing serializers and deserializers such as JSON, Apache Avro, Thrift, or Protobuf.

Serializing Using Apache Avro

Avro data is described in a language-independent schema. The schema is usually described in JSON and the serialization is usually to binary files, although serializing to JSON is also supported. Avro assumes that the schema is present when reading and writing files, usually by embedding the schema in the files themselves.

Suppose the original schema was

{"namespace": "customerManagement.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string""},
     {"name": "faxNumber", "type": ["null", "string"], "default": "null"} 
 ]
}

If we replace fax number by email - schema will be

{"namespace": "customerManagement.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "email", "type": ["null", "string"], "default": "null"}
 ]
}

If consumer gets a message with new schema getFax Number() will return null and things will still work as expected. If it gets a message with old schema getEmail() will return null

Even though the schema change there were no exceptions or breaking changes on slowly upgrading the consumers and producers

Two caveats

  • schemas must be compatible. Avro has compatibility rules documented
  • deserializer will need access to schema that was used for writing the data

When using avro files schema is included in the file itself

Using Avro Records with Kafka

Instead of storing schema with file we can store it in an architecture pattern called Schema Registry.

Example of using schema registry with generated Avro objects

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
   "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
   "io.confluent.kafka.serializers.KafkaAvroSerializer"); 
props.put("schema.registry.url", schemaUrl); 

String topic = "customerContacts";
int wait = 500;

Producer<String, Customer> producer = new KafkaProducer<String,
   Customer>(props); 

// We keep producing new events until someone ctrl-c
while (true) {
    Customer customer = CustomerGenerator.getNext();
    System.out.println("Generated customer " +
       customer.toString());
    ProducerRecord<String, Customer> record =
			new ProducerRecord<>(topic, customer.getId(), customer); 
    producer.send(record); 
}

##Partitions Kafka messages are key value pairs. Generally ProducerRecord is weitten with topic, key,value. It is okay to omit key but it is useful for hashing. All records with same key are put in the same partitions nad will be read by same process.

creating producer record with key as customer name

“ProducerRecord<Integer, String> record =
	new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");”

Creating producer record with a null key, just leave the key out

“ProducerRecord<Integer, String> record =
	new ProducerRecord<>("CustomerCountry", "USA");”

When key is not specified a round robin algorithm is used to select oune of the available paritions at random. Mapping of keys to partitions is consistent as long as the number of partitions in a topic doesn't change.

Chapter 4: Kafka Consumers: Reading Data from Kafka

Consumer and Consumer Groups

  • Application creates a consumer object, subscribes to a topic, and start receiving messages, validating them and writing results.
  • Multiple consumers can read from a topic
  • Kafka consumers are typically part of a consumer group
  • When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic
  • If consumers > number of partitions some consumers will be idle

The main we scale data consumption from a kafka topic is by adding more consumers to a consumer group. This is a good reason to create topics with large number of partitions - it allows adding more consumers when the load increases.

Each application that needs to read from the topic needs to have its own consumer group.

Consumer Groups and Paritions Rebalance

When we add new consumers to the group or consumers shut down, crash or get removed, partitions that are read by consumers will change so a partition rebalance will have to take place.

During a rebalance consumers cant consumer messages so its undesirable, when consumer moves from one group to other it loses its state.

Consumer maintains membership in a consumer group and ownership of partitions by sending heartbeats to a Kafka group coordinator broker. Heartbeats are sent when the consumer pools and when it commits records it has consumed

If heartbeats stop, group coordinator will consider it dead and trigger a rebalance

Creating a Kafka consumer

Properties props = new Properties()
props.put("bootstrap.servers", 
broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

Subscribing to topic

consumer.subscribe(Collections.singletonList("customerCountries"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment