Skip to content

Instantly share code, notes, and snippets.

@gfodor
Created March 20, 2017 07:44
Show Gist options
  • Save gfodor/a4f5e4721e959766e75e4c901bf42890 to your computer and use it in GitHub Desktop.
Save gfodor/a4f5e4721e959766e75e4c901bf42890 to your computer and use it in GitHub Desktop.
Kafka Topic Resetter Class
package com.altvr.streams.jobs.utils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class TopicOffsetResetter {
private static final Logger Log = LoggerFactory.getLogger(TopicOffsetResetter.class);
public static void resetTopicOffsetToBeginning(Properties jobProperties, String topic) {
Serde<byte[]> byteArraySerde = Serdes.ByteArray();
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, jobProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, jobProperties.get(StreamsConfig.APPLICATION_ID_CONFIG));
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<byte[], byte[]> resetConsumer
= new KafkaConsumer<>(consumerProps, byteArraySerde.deserializer(), byteArraySerde.deserializer());
resetConsumer.subscribe(Collections.singletonList(topic));
while (true) {
resetConsumer.poll(100);
if (resetConsumer.assignment().size() > 0) break;
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
if (resetConsumer.assignment().size() != resetConsumer.partitionsFor(topic).size()) {
Log.error("Did not get assigned all topics -- do you still have a consumer running for this group?");
resetConsumer.close();
return;
}
for (TopicPartition partition : resetConsumer.assignment()) {
Log.error("Reset " + partition);
offsets.put(partition, new OffsetAndMetadata(0));
}
Log.error("Committing.");
resetConsumer.commitSync(offsets);
Log.error("Done.");
resetConsumer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment