Skip to content

Instantly share code, notes, and snippets.

@rkpattnaik780
Created November 29, 2022 09:52
Show Gist options
  • Save rkpattnaik780/40c2cd421eb1f7c40514c25ae5060690 to your computer and use it in GitHub Desktop.
Save rkpattnaik780/40c2cd421eb1f7c40514c25ae5060690 to your computer and use it in GitHub Desktop.
Code snippets for Java Kafka client using SASL/OAuthBearer
import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
public class ConsumerExample {
public static void main(String[] args) {
var properties= KafkaConfig.properties();
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"test_group_2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
consumer.subscribe(Arrays.asList(TOPIC));
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String,String> record: records) {
System.out.println("Key: "+ record.key() + ", Value:" + record.value());
System.out.println("Partition:" + record.partition() + ",Offset:" + record.offset());
}
}
}
}
mport org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class KafkaConfig {
static Properties properties() {
var properties= new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , KAFKA_HOST);
properties.setProperty("security.protocol", "SASL_SSL");
properties.setProperty("sasl.mechanism", "OAUTHBEARER");
properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"" + RHOAS_SERVICE_ACCOUNT_CLIENT_ID + "\" clientSecret=\"" + RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET + "\" oauth.token.endpoint.uri=\"" + RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL + "\";");
properties.setProperty("sasl.login.callback.handler.class", "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
properties.setProperty("sasl.oauthbearer.token.endpoint.url", "https://sso.redhat.com/auth/realms/redhat-external/protocol/openid-connect/token");
properties.setProperty("sasl.oauthbearer.scope.claim.name", "api.iam.service_accounts");
return properties;
}
}
<!-- Dependencies to be added in the pom.xml file -->
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.8.5</version>
</dependency>
</dependency>
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
public class ProducerExample {
public static void main(String[] args) {
//Creating producer properties
var properties= KafkaConfig.properties();
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> producer= new KafkaProducer<String,String>(properties);
producer.send(new ProducerRecord<>(TOPIC, "Test message"));
producer.flush();
producer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment