You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Disk Throughput is important, SSD's can be a better choice
Disk Capacity determined by how many messages need to be retained at any time + 10% overhead
Total traffic for cluster can be balanced by having multiple partitions per topic
Memory CPU no special requirements
Network - one producer can have multiple consumers, this can cause inbound and outbound traffic imbalance
How many brokers?
determined by disk capacity, network bandwidth and replication factor
For example if each broker has storage 2 TB and cluster needs to retain 10TB data, we need 5 brokers, if RF is 1 we need 10 brokers, if one broker's bandwidth is at 80% we may need another broker etc.
Broker Configuration
same zookeeper.connect parameters and
unique broker.id
OS Tuning
The following parameters are set in /etc/sysctl.conf
Memory
vm.swappiness=1 # Kafka would like the swap to be as low as possible as it relies heaving on system page cache
vm.dirty_background_ratio < 10
vm.dirty_ratio between 60 and 80
current settings can be seen using cat /proc/vmstat
File System
file system should be xfs, mount should have noatime option
Networking
net.core.wmem_default and net.core.rmem_default 131072 (128 KB) send and receive buffer default size per socket
net.core.wmem_max and net.core.rmem_max 2097152 (2 MB) send and receive buffer maximum sizes
net.ipv4.tcp_wmem and net.ipv4.tcp_rmem send and receive buffer sizes for TCP sockets
Chapter 3: Kafka producers: Writing messages to Kafka
Producer - writes data to Kafka
Consumer - reads data from Kafka
Some application can serve both roles
Producer Overview
We start producing messages by creating a ProducerRecord - this includes the topic we want to send record to and a value, optionally we can also specify a key / or partition
Producer will serialize key value object to ByteArray
Sends data to partitioner, if no partition is specified this assigns a partition
it adds data to a batch that will go to same topic and partition. Separate thread will write data to broker
when broker receives message if successful sends RecordMetadata object with topic, partition and offset else sends error
Constructing a producer
3 Mandatory properties
bootstrap.servers - List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster. Use at least 2 brokers
key.serializer - Name of a class that will be used to serialize the keys of the records we will produce to Kafka. key.serializer should be set to a name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array.
value.serializer - Name of a class that will be used to serialize the values of the records we will produce to Kafka.
Sample code
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
3 main modes of operation
Fire and forget
Synchronous send - send() method returns a Future object and we use get() to wait on future object
Asynchronous send - call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker
Sending a message to Kafka
Simple implementation
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
ProducerRecord constructors - we use one that needs (topic, key, value)
The send() method returns a Java Future object with RecordMetadata
Retriable errors - once that can be resolved by sending the message again. Eg. connection error, no leader etc. you can configure number of auto retries with Kafka
Non Retriable errors - message size too large.
Sending asynchronously
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
To use callbacks, you need a class that implements the org.apache.kafka. clients.producer.Callback interface, which has a single function — onCompletion().
Configuring Producers
acks
acks = 0 - fire and forget
acks = 1 - will wait for ack from leader
acks = all - ack from broker once all in sync replicas have the message
buffer.memory
compression.type
retries
retry.backoff.ms
batch.size
linger.ms
client.id
max.in.flight.requests.per.connection - setting this to 1 ensures that messages will be written to broker in the order they were sent, even with retries
Kafka needs a serializer because it transmits data over the network as byte arrays.
Types of Serializers -
Custom Serializer
Serializing using Apache Avro
Custom Serialized example
Consider a class customer
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
Serializer for this
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is Null)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
“serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
Configuring a producer with this CustomerSerializer will allow you to define ProducerRecord<String, Customer>, and send Customer data and pass Customer objects directly to the producer.
But if any of the fields in Customer change we have to edit the serializer too.
For these reasons, we recommend using existing serializers and deserializers such as JSON, Apache Avro, Thrift, or Protobuf.
Serializing Using Apache Avro
Avro data is described in a language-independent schema. The schema is usually described in JSON and the serialization is usually to binary files, although serializing to JSON is also supported. Avro assumes that the schema is present when reading and writing files, usually by embedding the schema in the files themselves.
If consumer gets a message with new schema getFax Number() will return null and things will still work as expected.
If it gets a message with old schema getEmail() will return null
Even though the schema change there were no exceptions or breaking changes on slowly upgrading the consumers and producers
Two caveats
schemas must be compatible. Avro has compatibility rules documented
deserializer will need access to schema that was used for writing the data
When using avro files schema is included in the file itself
Using Avro Records with Kafka
Instead of storing schema with file we can store it in an architecture pattern called Schema Registry.
Example of using schema registry with generated Avro objects
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String,
Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " +
customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getId(), customer);
producer.send(record);
}
##Partitions
Kafka messages are key value pairs. Generally ProducerRecord is weitten with topic, key,value. It is okay to omit key but it is useful for hashing. All records with same key are put in the same partitions nad will be read by same process.
creating producer record with key as customer name
“ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");”
Creating producer record with a null key, just leave the key out
“ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "USA");”
When key is not specified a round robin algorithm is used to select oune of the available paritions at random.
Mapping of keys to partitions is consistent as long as the number of partitions in a topic doesn't change.
Chapter 4: Kafka Consumers: Reading Data from Kafka
Consumer and Consumer Groups
Application creates a consumer object, subscribes to a topic, and start receiving messages, validating them and writing results.
Multiple consumers can read from a topic
Kafka consumers are typically part of a consumer group
When multiple consumers are subscribed to a topic and belong to the same consumer group, each consumer in the group will receive messages from a different subset of the partitions in the topic
If consumers > number of partitions some consumers will be idle
The main we scale data consumption from a kafka topic is by adding more consumers to a consumer group. This is a good reason to create topics with large number of partitions - it allows adding more consumers when the load increases.
Each application that needs to read from the topic needs to have its own consumer group.
Consumer Groups and Paritions Rebalance
When we add new consumers to the group or consumers shut down, crash or get removed, partitions that are read by consumers will change so a partition rebalance will have to take place.
During a rebalance consumers cant consumer messages so its undesirable, when consumer moves from one group to other it loses its state.
Consumer maintains membership in a consumer group and ownership of partitions by sending heartbeats to a Kafka group coordinator broker. Heartbeats are sent when the consumer pools and when it commits records it has consumed
If heartbeats stop, group coordinator will consider it dead and trigger a rebalance
Creating a Kafka consumer
Properties props = new Properties()
props.put("bootstrap.servers",
broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);