Skip to content

Instantly share code, notes, and snippets.

@o-x-y-g-e-n
Last active July 30, 2024 08:07
Show Gist options
  • Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
Save o-x-y-g-e-n/dc2b9850dde21ce7b1ef914ec1f22b5f to your computer and use it in GitHub Desktop.
KafkaConsumer.java
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
public class KafkaConsumerService {
private final KafkaConsumerConfig config;
private final Consumer<String, String> consumer;
public KafkaConsumerService(KafkaConsumerConfig config) {
this.config = config;
this.consumer = new KafkaConsumer<>(config.getConsumerProperties());
}
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds) {
return consumeMessagesFromLastXSeconds(topic, seconds, null);
}
public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);
offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
if (offsetAndTimestamp != null) {
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, offsetAndTimestamp.offset());
}
});
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
allRecords.add(records);
if (System.currentTimeMillis() - fetchTime > seconds * 1000L) {
break;
}
}
} catch (Exception e) {
AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
consumer.close();
}
// Combine all polled records into one
return mergeConsumerRecords(allRecords);
}
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount) {
return consumeLastXMessages(topic, messageCount, null);
}
public ConsumerRecords<String, String> consumeLastXMessages(String topic, int messageCount, Integer partitionNumber) {
List<TopicPartition> topicPartitions = getTopicPartitions(topic, partitionNumber);
topicPartitions.forEach(tp -> {
long endOffset = consumer.endOffsets(Collections.singletonList(tp)).get(tp);
long startOffset = Math.max(0, endOffset - messageCount);
consumer.assign(Collections.singletonList(tp));
consumer.seek(tp, startOffset);
});
int remainingMessages = messageCount;
List<ConsumerRecords<String, String>> allRecords = new ArrayList<>();
try {
while (remainingMessages > 0) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
int recordCount = records.count();
if (recordCount > 0) {
allRecords.add(records);
remainingMessages -= recordCount;
} else {
break;
}
}
} catch (Exception e) {
AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
consumer.close();
}
// Combine all polled records into one
return mergeConsumerRecords(allRecords);
}
private List<TopicPartition> getTopicPartitions(String topic, Integer partitionNumber) {
List<TopicPartition> partitions = new ArrayList<>();
consumer.partitionsFor(topic).forEach(partitionInfo -> {
if (partitionNumber == null || partitionInfo.partition() == partitionNumber) {
partitions.add(new TopicPartition(topic, partitionInfo.partition()));
}
});
return partitions;
}
private ConsumerRecords<String, String> mergeConsumerRecords(List<ConsumerRecords<String, String>> recordsList) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
for (ConsumerRecords<String, String> records : recordsList) {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
if (!recordsMap.containsKey(partition)) {
recordsMap.put(partition, new ArrayList<>());
}
recordsMap.get(partition).addAll(partitionRecords);
}
}
return new ConsumerRecords<>(recordsMap);
}
}
@o-x-y-g-e-n
Copy link
Author

package com.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

public class KafkaConsumerConfig {
    private static final Logger LOGGER = Logger.getLogger(KafkaConsumerConfig.class.getName());
    private final Properties consumerProperties = new Properties();

    public KafkaConsumerConfig(String bootstrapServers, String groupId) {
        this(bootstrapServers, groupId, null, null);
    }

    public KafkaConsumerConfig(String bootstrapServers, String groupId, String keyDeserializer, String valueDeserializer) {
        LOGGER.info("Initializing KafkaConsumerConfig");

        // Basic properties
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
            keyDeserializer != null ? keyDeserializer : StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
            valueDeserializer != null ? valueDeserializer : StringDeserializer.class.getName());

        // Default additional properties
        setDefaultProperties();

        logProperties();
    }

    private void setDefaultProperties() {
        consumerProperties.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProperties.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        consumerProperties.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        consumerProperties.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        consumerProperties.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
        consumerProperties.putIfAbsent(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
        consumerProperties.putIfAbsent(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
        consumerProperties.putIfAbsent(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
        consumerProperties.putIfAbsent(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
        consumerProperties.putIfAbsent(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaConsumerClient");
    }

    public Properties getConsumerProperties() {
        return consumerProperties;
    }

    public void setProperties(Properties properties) {
        properties.forEach((key, value) -> {
            consumerProperties.put(key, value);
            LOGGER.info("Set property " + key + ": " + value);
        });
    }

    public Properties getProperties() {
        Properties propertiesCopy = new Properties();
        consumerProperties.forEach(propertiesCopy::put);
        return propertiesCopy;
    }

    private void logProperties() {
        LOGGER.info("Kafka Consumer Properties:");
        consumerProperties.forEach((key, value) -> LOGGER.info(key + ": " + value));
    }

    // Static method to create a KafkaConsumerConfig with custom properties
    public static KafkaConsumerConfig withCustomProperties(Properties customProperties) {
        KafkaConsumerConfig config = new KafkaConsumerConfig(
            customProperties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
            customProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
            customProperties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
            customProperties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
        );

        config.setProperties(customProperties);
        config.logProperties();
        return config;
    }
}

@o-x-y-g-e-n
Copy link
Author

public ConsumerRecords<String, String> consumeMessagesFromLastXSeconds(String topic, int seconds, Integer partitionNumber) {
long fetchTime = System.currentTimeMillis() - (seconds * 1000L);
List topicPartitions = getTopicPartitions(topic, partitionNumber);
Map<TopicPartition, Long> timestampsToSearch = topicPartitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> fetchTime));

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(timestampsToSearch);

offsetsForTimes.forEach((tp, offsetAndTimestamp) -> {
    if (offsetAndTimestamp != null) {
        consumer.assign(Collections.singletonList(tp));
        consumer.seek(tp, offsetAndTimestamp.offset());
    }
});

List<ConsumerRecord<String, String>> collectedRecords = new ArrayList<>();
long endTime = System.currentTimeMillis() + (seconds * 1000L);
boolean keepPolling = true;

try {
    while (keepPolling) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); // Polling with sufficient duration
        if (!records.isEmpty()) {
            records.forEach(collectedRecords::add);
        }
        
        // Exit conditions
        if (System.currentTimeMillis() >= endTime) {
            keepPolling = false;
        }
    }
} catch (Exception e) {
    AppLogger.logError("Error during message consumption: " + e.getMessage());
} finally {
    consumer.close();
}

// Convert collected records into a single ConsumerRecords object
return mergeCollectedRecords(collectedRecords);

}

// Utility method to merge collected records into a single ConsumerRecords object
private ConsumerRecords<String, String> mergeCollectedRecords(List<ConsumerRecord<String, String>> records) {
Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
recordMap.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), k -> new ArrayList<>()).add(record);
}
return new ConsumerRecords<>(recordMap);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment