Skip to content

Instantly share code, notes, and snippets.

@syahiaoui
Created March 13, 2024 09:56
Show Gist options
  • Save syahiaoui/03640afc4137cef87f131b2d505f876b to your computer and use it in GitHub Desktop.
Save syahiaoui/03640afc4137cef87f131b2d505f876b to your computer and use it in GitHub Desktop.
Kafka reset offsets (latest, earliest, forTimestamp, specific offsets)
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class AdminClientOffsetReset {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Specify the consumer group ID and the topic name
String consumerGroupId = "test-group";
String topicName = "test-topic";
try (AdminClient adminClient = AdminClient.create(properties)) {
// Get the partitions for the topic
List<TopicPartitionInfo> partitions = adminClient.describeTopics(Collections.singletonList(topicName)).all().get().get(topicName).partitions();
// Reset consumer group offsets to specific offset for partition 0 and 3
Map<TopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new TopicPartition(topicName, 0), 245396786L);
specificOffsets.put(new TopicPartition(topicName, 3), 564452394L);
resetOffsetsToSpecificOffset(adminClient, consumerGroupId, specificOffsets);
// Reset consumer group offsets to earliest for each partition
resetOffsetsToEarliest(adminClient, consumerGroupId, topicName, partitions);
// Reset consumer group offsets to latest for each partition
resetOffsetsToLatest(adminClient, consumerGroupId, topicName, partitions);
// Reset consumer group offsets for a specific timestamp for each partition
long timestampInMillis = System.currentTimeMillis() - Duration.ofDays(1).toMillis(); // One day ago
resetOffsetsForTimestamp(adminClient, consumerGroupId, topicName, partitions, timestampInMillis);
} catch (Exception e) {
System.out.println("Failed to update the offsets committed by group " + consumerGroupId +
" with error " + e.getMessage());
if (e.getCause() instanceof UnknownMemberIdException)
System.out.println("Check if consumer group is still active.");
}
}
private static void resetOffsetsToSpecificOffset(AdminClient adminClient, String consumerGroupId, Map<TopicPartition, Long> specificOffsets) throws InterruptedException, ExecutionException {
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, Long> e:
specificOffsets.entrySet()) {
System.out.println("Will reset topic-partition " + e.getKey() + " to offset " + e.getValue());
resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue()));
}
adminClient.alterConsumerGroupOffsets(consumerGroupId, resetOffsets).all().get();
}
private static void resetOffsetsToEarliest(AdminClient adminClient, String consumerGroupId, String topicName, List<TopicPartitionInfo> partitions) throws InterruptedException, ExecutionException {
Map<TopicPartition, OffsetSpec> requestEarliestOffsets = new HashMap<>();
for (TopicPartitionInfo partitionInfo : partitions) {
requestEarliestOffsets.put(new TopicPartition(topicName, partitionInfo.partition()), OffsetSpec.earliest());
}
resetOffsets(adminClient, consumerGroupId, requestEarliestOffsets);
}
private static void resetOffsetsToLatest(AdminClient adminClient, String consumerGroupId, String topicName, List<TopicPartitionInfo> partitions) throws InterruptedException, ExecutionException {
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for (TopicPartitionInfo partitionInfo : partitions) {
requestLatestOffsets.put(new TopicPartition(topicName, partitionInfo.partition()), OffsetSpec.latest());
}
resetOffsets(adminClient, consumerGroupId, requestLatestOffsets);
}
private static void resetOffsetsForTimestamp(AdminClient adminClient, String consumerGroupId, String topicName, List<TopicPartitionInfo> partitions, long timestampInMillis) throws InterruptedException, ExecutionException {
Map<TopicPartition, OffsetSpec> requestOffsetsForTimestamp = new HashMap<>();
for (TopicPartitionInfo partitionInfo : partitions) {
requestOffsetsForTimestamp.put(new TopicPartition(topicName, partitionInfo.partition()), OffsetSpec.forTimestamp(timestampInMillis));
}
resetOffsets(adminClient, consumerGroupId, requestOffsetsForTimestamp);
}
private static void resetOffsets(AdminClient adminClient, String consumerGroupId, Map<TopicPartition, OffsetSpec> requestLatestOffsets) throws InterruptedException, ExecutionException {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
adminClient.listOffsets(requestLatestOffsets).all().get();
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e:
latestOffsets.entrySet()) {
System.out.println("Will reset topic-partition " + e.getKey() + " to offset " + e.getValue().offset());
resetOffsets.put(e.getKey(), new OffsetAndMetadata(e.getValue().offset()));
}
adminClient.alterConsumerGroupOffsets(consumerGroupId, resetOffsets).all().get();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment