-
-
Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; | |
import org.apache.kafka.common.TopicPartition; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.stream.Collectors; | |
public class KafkaConsumerService { | |
private final KafkaConsumerConfig config; | |
private final Consumer<String, String> consumer; | |
public KafkaConsumerService(KafkaConsumerConfig config) { | |
this.config = config; | |
this.consumer = new KafkaConsumer<>(config.getConsumerProperties()); | |
} | |
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds) { | |
return consumeMessagesFromLastXSeconds(topic, seconds, null); | |
} | |
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) { | |
long fetchTime = System.currentTimeMillis() - (seconds * 1000L); | |
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber); | |
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream() | |
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime)); | |
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch); | |
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> { | |
if (offsetAndTimestamp != null) { | |
consumer.assign(Collections.singletonList(tp)); | |
consumer.seek(tp, offsetAndTimestamp.offset()); | |
} | |
}); | |
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>(); | |
try { | |
while (true) { | |
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); | |
allRecords.add(records); | |
if (System.currentTimeMillis() - fetchTime > seconds * 1000L) { | |
break; | |
} | |
} | |
} catch (Exception e) { | |
AppLogger.logError("Error during message consumption: " + e.getMessage()); | |
} finally { | |
consumer.close(); | |
} | |
// Combine all polled records into one | |
return mergeConsumerRecords(allRecords); | |
} | |
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount) { | |
return consumeLastXMessages(topic, messageCount, null); | |
} | |
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount, Integer partitionNumber) { | |
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber); | |
topicPartitions.forEach(tp -> { | |
long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp); | |
long startOffset = Math.max(0, endOffset - messageCount); | |
consumer.assign(Collections.singletonList(tp)); | |
consumer.seek(tp, startOffset); | |
}); | |
int remainingMessages = messageCount; | |
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>(); | |
try { | |
while (remainingMessages > 0) { | |
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); | |
int recordCount = records.count(); | |
if (recordCount > 0) { | |
allRecords.add(records); | |
remainingMessages -= recordCount; | |
} else { | |
break; | |
} | |
} | |
} catch (Exception e) { | |
AppLogger.logError("Error during message consumption: " + e.getMessage()); | |
} finally { | |
consumer.close(); | |
} | |
// Combine all polled records into one | |
return mergeConsumerRecords(allRecords); | |
} | |
private List<TopicPartition> getTopicPartitions(String topic, Integer partitionNumber) { | |
List<TopicPartition> partitions = new ArrayList<>(); | |
consumer.partitionsFor(topic).forEach(partitionInfo -> { | |
if (partitionNumber == null || partitionInfo.partition() == partitionNumber) { | |
partitions.add(new TopicPartition(topic, partitionInfo.partition())); | |
} | |
}); | |
return partitions; | |
} | |
private ConsumerRecords<String, String> mergeConsumerRecords(List<ConsumerRecords<String, String>> recordsList) { | |
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); | |
for (ConsumerRecords<String, String> records : recordsList) { | |
for (TopicPartition partition : records.partitions()) { | |
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); | |
if (!recordsMap.containsKey(partition)) { | |
recordsMap.put(partition, new ArrayList<>()); | |
} | |
recordsMap.get(partition).addAll(partitionRecords); | |
} | |
} | |
return new ConsumerRecords<>(recordsMap); | |
} | |
} |
o-x-y-g-e-n
commented
Jul 17, 2024
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
List<ConsumerRecord<String, String>> collectedRecords = new ArrayList<>();
long endTime = System.currentTimeMillis() + (seconds * 1000L);
boolean keepPolling = true;
try {
while (keepPolling) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); // Polling with sufficient duration
if (!records.isEmpty()) {
records.forEach(collectedRecords::add);
}
// Exit conditions
if (System.currentTimeMillis() >= endTime) {
keepPolling = false;
}
}
} catch (Exception e) {
AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
consumer.close();
}
// Convert collected records into a single ConsumerRecords object
return mergeCollectedRecords(collectedRecords);
}
// Utility method to merge collected records into a single ConsumerRecords object
private ConsumerRecords<String, String> mergeCollectedRecords(List<ConsumerRecord<String, String>> records) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
recordMap.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>()).add(record);
}
return new ConsumerRecords<>(recordMap);
}