Last active
July 30, 2024 08:07
-
-
Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
KafkaConsumer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.OffsetAndTimestamp; | |
import org.apache.kafka.common.TopicPartition; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.stream.Collectors; | |
public class KafkaConsumerService { | |
private final KafkaConsumerConfig config; | |
private final Consumer<String, String> consumer; | |
public KafkaConsumerService(KafkaConsumerConfig config) { | |
this.config = config; | |
this.consumer = new KafkaConsumer<>(config.getConsumerProperties()); | |
} | |
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds) { | |
return consumeMessagesFromLastXSeconds(topic, seconds, null); | |
} | |
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) { | |
long fetchTime = System.currentTimeMillis() - (seconds * 1000L); | |
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber); | |
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream() | |
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime)); | |
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch); | |
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> { | |
if (offsetAndTimestamp != null) { | |
consumer.assign(Collections.singletonList(tp)); | |
consumer.seek(tp, offsetAndTimestamp.offset()); | |
} | |
}); | |
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>(); | |
try { | |
while (true) { | |
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); | |
allRecords.add(records); | |
if (System.currentTimeMillis() - fetchTime > seconds * 1000L) { | |
break; | |
} | |
} | |
} catch (Exception e) { | |
AppLogger.logError("Error during message consumption: " + e.getMessage()); | |
} finally { | |
consumer.close(); | |
} | |
// Combine all polled records into one | |
return mergeConsumerRecords(allRecords); | |
} | |
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount) { | |
return consumeLastXMessages(topic, messageCount, null); | |
} | |
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount, Integer partitionNumber) { | |
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber); | |
topicPartitions.forEach(tp -> { | |
long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp); | |
long startOffset = Math.max(0, endOffset - messageCount); | |
consumer.assign(Collections.singletonList(tp)); | |
consumer.seek(tp, startOffset); | |
}); | |
int remainingMessages = messageCount; | |
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>(); | |
try { | |
while (remainingMessages > 0) { | |
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); | |
int recordCount = records.count(); | |
if (recordCount > 0) { | |
allRecords.add(records); | |
remainingMessages -= recordCount; | |
} else { | |
break; | |
} | |
} | |
} catch (Exception e) { | |
AppLogger.logError("Error during message consumption: " + e.getMessage()); | |
} finally { | |
consumer.close(); | |
} | |
// Combine all polled records into one | |
return mergeConsumerRecords(allRecords); | |
} | |
private List<TopicPartition> getTopicPartitions(String topic, Integer partitionNumber) { | |
List<TopicPartition> partitions = new ArrayList<>(); | |
consumer.partitionsFor(topic).forEach(partitionInfo -> { | |
if (partitionNumber == null || partitionInfo.partition() == partitionNumber) { | |
partitions.add(new TopicPartition(topic, partitionInfo.partition())); | |
} | |
}); | |
return partitions; | |
} | |
private ConsumerRecords<String, String> mergeConsumerRecords(List<ConsumerRecords<String, String>> recordsList) { | |
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>(); | |
for (ConsumerRecords<String, String> records : recordsList) { | |
for (TopicPartition partition : records.partitions()) { | |
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); | |
if (!recordsMap.containsKey(partition)) { | |
recordsMap.put(partition, new ArrayList<>()); | |
} | |
recordsMap.get(partition).addAll(partitionRecords); | |
} | |
} | |
return new ConsumerRecords<>(recordsMap); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));
}
// Utility method to merge collected records into a single ConsumerRecords object
private ConsumerRecords<String, String> mergeCollectedRecords(List<ConsumerRecord<String, String>> records) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
recordMap.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>()).add(record);
}
return new ConsumerRecords<>(recordMap);
}