We use the reference topic named test
, and the consumer group piero-group
. Zookeeper runs on localhost:9092
, Kafka bootstrap server is localhost:2181
-
Create a topic named
test
with 4 partitions, without replication~/tools/kafka_2.11-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 4 --replication-factor 1 --topic test4
-
Produce String messages from console and post them on topic
~/tools/kafka_2.11-1.1.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Then publish one message at every new line (all these messages will have
key = null
>Buongiorno >vuole un caffè? >o un cappuccino?
-
Produce String messages with given key/value
./kafka-console-producer.sh --broker-list localhost:9092 --topic test --property "parse.key=true" \ --property "key.separator=:"
Then publish separating key/value with
:
>key123:test123
-
Consume from topic from beginning
~/tools/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
-
Create consumer group
piero-group
for topictest
and consume~/tools/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group piero-group
This way the consumer group holds the current offset and the end offset.
-
Inspect a consumer group
~/tools/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group piero-group Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 17 22 5 consumer-1-6249efa3-4553-409b-a292-ab20ac19c363 /172.21.0.1 consumer-1
-
Delete a topic
~/tools/kafka_2.11-1.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test4
Kafka messages are a string of bytes (message value), plus some metadata (message key). Topics are split in partitions. When we commit something we should thing that we commit something to a partition, not to a topic. The commit log is the partition, that has ordering guarantees in terms of time (if a producer produces to topic A, partition1, messages msg1, msg2
then we are sure that in the partition we will find msg1, msg2
in this order
Given the key and a partitioner, messages are published (by the consumer) in the partition of the topic determined by the partitioner.
To create a producer we need at least the bootstrap servers and the K, V serializers. When the producer accumulates enough messages meant for a partition, it sends them in batch to the broker that acts as partition leader.
The consumer consumes from a partition in a topic, therefore consumer keeps track of the offset for that particular partition of the topic. This means that a consumer is responsible to track the offset of a partition, so we can't have 2 consumers for one partition. The rule is: A consumer can consume from multiple partitions, but one partition cannot have multiple consumers. If a consumer stops working, the load of consumption can be redistributed across consumers of the same consumer group
Consumer group concepts:
- Every partition has a partition leader to which all consumers and producers must be connected
- For every consumer group, one of the brokers becomes group coordinator and is responsible to monitor the fact that consumers are able to consume from the group. This happens having consumer sending heartbeat messages to the coordinator
- The first consumer that joins a consumer group becomes the group leader
- For every new consumer that joins the group, it notifies itself to the group coordinator. The group coordinator asks the group leader to assign partitions to the new joiner, and then the new joiner will start consuming from the partitions it got assigned
To create a consumer we need at least the bootstrap servers and the K, V deserializers
Consumer exposes a poll()
method that must be invoked continuously as it sends the heartbeat signal, and returns a list of ConsumerRecord[K, V]
, each record containing:
- partition
- partition offset
- key
- value
When enable.auto.commit=true
, at every poll, the consumer commits the highest offset among the records it received in the previous call. Pro: simple, Con: If the consumer service crashes before committing, some messages will be processed twice
When enable.auto.commit=false
, we must explicitly call consumer.commitSync
when we are done with processing the messages provided by previous poll call. Con: there is still a chance of double processing if there is a rebalance of consumers before the offset gets committed.