Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups
command.
- List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe
Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.
- Reset the consumer offset for a topic (preview)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest
This will print the expected result of the reset, but not actually run it.
- Reset the consumer offset for a topic (execute)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
This will execute the reset and reset the consumer group offset for the specified topic back to 0.
- Repeat 1 to check if the reset is successful
- The consumer group must have no running instance when performing the reset. Otherwise the reset will be rejected.
- There are many other resetting options, run
kafka-consumer-groups
for details- --shift-by <positive_or_negative_integer>
- --to-current
- --to-latest
- --to-offset <offset_integer>
- --to-datetime <datetime_string>
- --by-duration <duration_string>
- The command also provides an option to reset offsets for all topics the consumer group subscribes to:
--all-topics
I have not managed to solve this problem, nor found the specific configuration for this. What I can do is delete and recreate the problematic topic.