Skip to content

Instantly share code, notes, and snippets.

@o-x-y-g-e-n
Last active July 30, 2024 08:07
Show Gist options
  • Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
KafkaConsumer.java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
public class KafkaConsumerService {
private final KafkaConsumerConfig config;
private final Consumer<String, String> consumer;
public KafkaConsumerService(KafkaConsumerConfig config) {
this.config = config;
this.consumer = new KafkaConsumer<>(config.getConsumerProperties());
}
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds) {
return consumeMessagesFromLastXSeconds(topic, seconds, null);
}
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
allRecords.add(records);
if (System.currentTimeMillis() - fetchTime > seconds * 1000L) {
break;
}
}
} catch (Exception e) {
AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
consumer.close();
}
// Combine all polled records into one
return mergeConsumerRecords(allRecords);
}
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount) {
return consumeLastXMessages(topic, messageCount, null);
}
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount, Integer partitionNumber) {
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber);
topicPartitions.forEach(tp -> {
long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp);
long startOffset = Math.max(0, endOffset - messageCount);
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, startOffset);
});
int remainingMessages = messageCount;
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>();
try {
while (remainingMessages > 0) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
int recordCount = records.count();
if (recordCount > 0) {
allRecords.add(records);
remainingMessages -= recordCount;
} else {
break;
}
}
} catch (Exception e) {
AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
consumer.close();
}
// Combine all polled records into one
return mergeConsumerRecords(allRecords);
}
private List<TopicPartition> getTopicPartitions(String topic, Integer partitionNumber) {
List<TopicPartition> partitions = new ArrayList<>();
consumer.partitionsFor(topic).forEach(partitionInfo -> {
if (partitionNumber == null || partitionInfo.partition() == partitionNumber) {
partitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
});
return partitions;
}
private ConsumerRecords<String, String> mergeConsumerRecords(List<ConsumerRecords<String, String>> recordsList) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
for (ConsumerRecords<String, String> records : recordsList) {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
if (!recordsMap.containsKey(partition)) {
recordsMap.put(partition, new ArrayList<>());
}
recordsMap.get(partition).addAll(partitionRecords);
}
}
return new ConsumerRecords<>(recordsMap);
}
}
@o-x-y-g-e-n
Copy link
Author

public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);

offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
    if (offsetAndTimestamp != null) {
        consumer.assign(Collections.singletonList(tp));
        consumer.seek(tp, offsetAndTimestamp.offset());
    }
});

List<ConsumerRecord<String, String>> collectedRecords = new ArrayList<>();
long endTime = System.currentTimeMillis() + (seconds * 1000L);
boolean keepPolling = true;

try {
    while (keepPolling) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); // Polling with sufficient duration
        if (!records.isEmpty()) {
            records.forEach(collectedRecords::add);
        }
        
        // Exit conditions
        if (System.currentTimeMillis() >= endTime) {
            keepPolling = false;
        }
    }
} catch (Exception e) {
    AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
    consumer.close();
}

// Convert collected records into a single ConsumerRecords object
return mergeCollectedRecords(collectedRecords);

}

// Utility method to merge collected records into a single ConsumerRecords object
private ConsumerRecords<String, String> mergeCollectedRecords(List<ConsumerRecord<String, String>> records) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
recordMap.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>()).add(record);
}
return new ConsumerRecords<>(recordMap);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment