Created
November 29, 2022 12:56
-
-
Save rkpattnaik780/d2c4975eee70750964e2854a5973354d to your computer and use it in GitHub Desktop.
Code snippet for Quarkus Kafka using SASL/OAUTHBEARER
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mp.messaging.incoming.prices.connector=smallrye-kafka | |
mp.messaging.incoming.prices.topic=prices | |
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer | |
mp.messaging.outgoing.generated-price.connector=smallrye-kafka | |
mp.messaging.outgoing.generated-price.topic=prices | |
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer | |
kafka.bootstrap.servers=${KAFKA_HOST} | |
kafka.security.protocol=SASL_SSL | |
kafka.sasl.mechanism=OAUTHBEARER | |
kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ | |
clientId="${RHOAS_SERVICE_ACCOUNT_CLIENT_ID}" \ | |
clientSecret="${RHOAS_SERVICE_ACCOUNT_CLIENT_SECRET}" ; | |
kafka.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler | |
kafka.sasl.oauthbearer.token.endpoint.url=${RHOAS_SERVICE_ACCOUNT_OAUTH_TOKEN_URL} | |
kafka.sasl.oauthbearer.scope.claim.name=api.iam.service_accounts |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.acme.kafka; | |
import javax.enterprise.context.ApplicationScoped; | |
import org.eclipse.microprofile.reactive.messaging.Acknowledgment; | |
import org.eclipse.microprofile.reactive.messaging.Incoming; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
import io.smallrye.reactive.messaging.annotations.Broadcast; | |
/** | |
* A bean consuming data from the "prices" Kafka topic and applying some conversion. | |
* The result is pushed to the "my-data-stream" stream which is an in-memory stream. | |
*/ | |
@ApplicationScoped | |
public class PriceConverter { | |
// Consume from the `prices` channel and produce to the `my-data-stream` channel | |
@Incoming("prices") | |
@Outgoing("my-data-stream") | |
// Send to all subscribers | |
@Broadcast | |
// Acknowledge the messages before calling this method. | |
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) | |
public int process(int price) { | |
return price; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.acme.kafka; | |
import java.time.Duration; | |
import java.util.Random; | |
import javax.enterprise.context.ApplicationScoped; | |
import io.smallrye.mutiny.Multi; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
/** | |
* A bean producing random prices every 5 seconds. | |
* The prices are written to a Kafka topic (prices). The Kafka configuration is specified in the application configuration. | |
*/ | |
@ApplicationScoped | |
public class PriceProducer { | |
private Random random = new Random(); | |
@Outgoing("generated-price") | |
public Multi<Integer> generate() { | |
return Multi.createFrom().ticks().every(Duration.ofSeconds(5)) | |
.onOverflow().drop() | |
.map(tick -> random.nextInt(100)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment