Skip to content

Instantly share code, notes, and snippets.

@MarinhoFeliphe
Last active June 14, 2024 21:06
Show Gist options
  • Save MarinhoFeliphe/353f7671001e2443b2d97ef238c03b51 to your computer and use it in GitHub Desktop.
Save MarinhoFeliphe/353f7671001e2443b2d97ef238c03b51 to your computer and use it in GitHub Desktop.
PaymentRequestConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
public class PaymentRequestConsumer {
private static final Logger _log = LoggerFactory.getLogger(PaymentRequestConsumer.class.getSimpleName());
private static final String _paymentRequestTopic = "payment-request";
public static void main(String[] args) {
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(_getProperties())) {
kafkaConsumer.subscribe(List.of(_paymentRequestTopic));
while (true) {
_log.info("Polling...");
for (ConsumerRecord<String, String> consumerRecord : kafkaConsumer.poll(Duration.ofMillis(1000))) {
_log.info("Key: {}, value: {}", consumerRecord.key(), consumerRecord.value());
_log.info("Partition: {}, offset: {}", consumerRecord.partition(), consumerRecord.offset());
}
}
}
}
private static Properties _getProperties() {
Properties properties = new Properties();
properties.put("sasl.jaas.config", ScramLoginModule.class.getName() + " required username=\"" + _userName + "\" password=\"" + _password + "\";");
properties.put("sasl.mechanism", "SCRAM-SHA-256");
properties.put("security.protocol", "SASL_SSL");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "foo-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return properties;
}
private final static String _bootstrapServer = "...";
private final static String _password = ""...";
private final static String _userName = ""...";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment