Skip to content

Instantly share code, notes, and snippets.

@rkpattnaik780
Created November 29, 2022 12:56
Show Gist options
  • Save rkpattnaik780/d2c4975eee70750964e2854a5973354d to your computer and use it in GitHub Desktop.
Save rkpattnaik780/d2c4975eee70750964e2854a5973354d to your computer and use it in GitHub Desktop.
Code snippet for Quarkus Kafka using SASL/OAUTHBEARER
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=prices
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
kafka.bootstrap.servers=${KAFKA_HOST}
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=OAUTHBEARER
kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="${RHOAS_SERVICE_ACCOUNT_CLIENT_ID}" \
clientSecret="${RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET}" ;
kafka.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
kafka.sasl.oauthbearer.token.endpoint.url=${RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL}
kafka.sasl.oauthbearer.scope.claim.name=api.iam.service_accounts
package org.acme.kafka;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;
/**
* A bean consuming data from the "prices" Kafka topic and applying some conversion.
* The result is pushed to the "my-data-stream" stream which is an in-memory stream.
*/
@ApplicationScoped
public class PriceConverter {
// Consume from the `prices` channel and produce to the `my-data-stream` channel
@Incoming("prices")
@Outgoing("my-data-stream")
// Send to all subscribers
@Broadcast
// Acknowledge the messages before calling this method.
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public int process(int price) {
return price;
}
}
package org.acme.kafka;
import java.time.Duration;
import java.util.Random;
import javax.enterprise.context.ApplicationScoped;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
/**
* A bean producing random prices every 5 seconds.
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration.
*/
@ApplicationScoped
public class PriceProducer {
private Random random = new Random();
@Outgoing("generated-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onOverflow().drop()
.map(tick -> random.nextInt(100));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment