Skip to content

Instantly share code, notes, and snippets.

@fernandes
Created March 6, 2025 13:30
Show Gist options
  • Save fernandes/9ae2bd2ec4c9196270299551004df96a to your computer and use it in GitHub Desktop.
Save fernandes/9ae2bd2ec4c9196270299551004df96a to your computer and use it in GitHub Desktop.
How to use Rdkafka (with Racecar) to reset a consumer offset
# Input
consumer_klass_name = "YourAwesomeConsumer"
offset = 0
# Runnnnnnnn
consumer_klass = consumer_klass_name.constantize
config_file = "config/racecar.rb"
require Rails.root.join(config_file)
Racecar.config.group_id_prefix
Racecar.config.load_consumer_class(consumer_klass)
group_id = Racecar.config.group_id
unless consumer_klass.subscriptions.size == 1
raise StandardError.new("Consumer subscribed to more than 1 topic, please specify")
end
topic = consumer_klass.subscriptions.first.topic
config = {
:"bootstrap.servers" => ENV["KAFKA_URL"],
:"group.id" => group_id,
:"enable.auto.commit" => false
}
rdkafka_config = Rdkafka::Config.new(config)
native_config = rdkafka_config.send(:native_config)
native_kafka = rdkafka_config.send(:native_kafka, native_config, :rd_kafka_consumer)
metadata = Rdkafka::Metadata.new(native_kafka, topic)
partition_count = metadata.topics.first[:partition_count]
if partition_count > 1
raise StandardError.new("More than one partition")
end
# Assuming first partition
partition = 0
consumer = rdkafka_config.consumer
consumer.assign(Rdkafka::Consumer::TopicPartitionList.new({topic => [Rdkafka::Consumer::Partition.new(partition, offset)]}))
consumer.commit(consumer.assignment, false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment