Created
March 6, 2025 13:30
-
-
Save fernandes/9ae2bd2ec4c9196270299551004df96a to your computer and use it in GitHub Desktop.
How to use Rdkafka (with Racecar) to reset a consumer offset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Input | |
consumer_klass_name = "YourAwesomeConsumer" | |
offset = 0 | |
# Runnnnnnnn | |
consumer_klass = consumer_klass_name.constantize | |
config_file = "config/racecar.rb" | |
require Rails.root.join(config_file) | |
Racecar.config.group_id_prefix | |
Racecar.config.load_consumer_class(consumer_klass) | |
group_id = Racecar.config.group_id | |
unless consumer_klass.subscriptions.size == 1 | |
raise StandardError.new("Consumer subscribed to more than 1 topic, please specify") | |
end | |
topic = consumer_klass.subscriptions.first.topic | |
config = { | |
:"bootstrap.servers" => ENV["KAFKA_URL"], | |
:"group.id" => group_id, | |
:"enable.auto.commit" => false | |
} | |
rdkafka_config = Rdkafka::Config.new(config) | |
native_config = rdkafka_config.send(:native_config) | |
native_kafka = rdkafka_config.send(:native_kafka, native_config, :rd_kafka_consumer) | |
metadata = Rdkafka::Metadata.new(native_kafka, topic) | |
partition_count = metadata.topics.first[:partition_count] | |
if partition_count > 1 | |
raise StandardError.new("More than one partition") | |
end | |
# Assuming first partition | |
partition = 0 | |
consumer = rdkafka_config.consumer | |
consumer.assign(Rdkafka::Consumer::TopicPartitionList.new({topic => [Rdkafka::Consumer::Partition.new(partition, offset)]})) | |
consumer.commit(consumer.assignment, false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment