Skip to content

Instantly share code, notes, and snippets.

@pierangeloc
Last active March 11, 2019 16:42
Show Gist options
  • Save pierangeloc/e6dcd3993276838558eb2a86f14a6e06 to your computer and use it in GitHub Desktop.
Save pierangeloc/e6dcd3993276838558eb2a86f14a6e06 to your computer and use it in GitHub Desktop.
Useful Kafka console commands

Kafka 101

We use the reference topic named test, and the consumer group piero-group. Zookeeper runs on localhost:9092, Kafka bootstrap server is localhost:2181

  • Create a topic named test with 4 partitions, without replication

    ~/tools/kafka_2.11-1.1.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 4 --replication-factor 1 --topic test4
    
  • Produce String messages from console and post them on topic

    ~/tools/kafka_2.11-1.1.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    Then publish one message at every new line (all these messages will have key = null

    >Buongiorno
    >vuole un caffè?
    >o un cappuccino?
    
  • Produce String messages with given key/value

    ./kafka-console-producer.sh --broker-list localhost:9092 --topic test --property "parse.key=true" \
    --property "key.separator=:"
    

    Then publish separating key/value with :

    >key123:test123
    
  • Consume from topic from beginning

    ~/tools/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
  • Create consumer group piero-group for topic test and consume

    ~/tools/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group piero-group
    

    This way the consumer group holds the current offset and the end offset.

  • Inspect a consumer group

    ~/tools/kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group piero-group
    
    Note: This will not show information about old Zookeeper-based consumers.
    
    TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
    test            0          17              22              5               consumer-1-6249efa3-4553-409b-a292-ab20ac19c363 /172.21.0.1     consumer-1
    
  • Delete a topic

    ~/tools/kafka_2.11-1.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test4
    

Kafka concepts

Topics and Partitions

Kafka messages are a string of bytes (message value), plus some metadata (message key). Topics are split in partitions. When we commit something we should thing that we commit something to a partition, not to a topic. The commit log is the partition, that has ordering guarantees in terms of time (if a producer produces to topic A, partition1, messages msg1, msg2 then we are sure that in the partition we will find msg1, msg2 in this order

Producers, Consumers and Consumer Groups

Given the key and a partitioner, messages are published (by the consumer) in the partition of the topic determined by the partitioner.

To create a producer we need at least the bootstrap servers and the K, V serializers. When the producer accumulates enough messages meant for a partition, it sends them in batch to the broker that acts as partition leader.

The consumer consumes from a partition in a topic, therefore consumer keeps track of the offset for that particular partition of the topic. This means that a consumer is responsible to track the offset of a partition, so we can't have 2 consumers for one partition. The rule is: A consumer can consume from multiple partitions, but one partition cannot have multiple consumers. If a consumer stops working, the load of consumption can be redistributed across consumers of the same consumer group

Consumer group concepts:

  • Every partition has a partition leader to which all consumers and producers must be connected
  • For every consumer group, one of the brokers becomes group coordinator and is responsible to monitor the fact that consumers are able to consume from the group. This happens having consumer sending heartbeat messages to the coordinator
  • The first consumer that joins a consumer group becomes the group leader
  • For every new consumer that joins the group, it notifies itself to the group coordinator. The group coordinator asks the group leader to assign partitions to the new joiner, and then the new joiner will start consuming from the partitions it got assigned

To create a consumer we need at least the bootstrap servers and the K, V deserializers

Consumer exposes a poll() method that must be invoked continuously as it sends the heartbeat signal, and returns a list of ConsumerRecord[K, V], each record containing:

  • partition
  • partition offset
  • key
  • value

When enable.auto.commit=true, at every poll, the consumer commits the highest offset among the records it received in the previous call. Pro: simple, Con: If the consumer service crashes before committing, some messages will be processed twice

When enable.auto.commit=false, we must explicitly call consumer.commitSync when we are done with processing the messages provided by previous poll call. Con: there is still a chance of double processing if there is a rebalance of consumers before the offset gets committed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment