Last active
November 25, 2019 21:02
-
-
Save hguerrero/0cdfb5ee9dfb05707ebd82c15af73f5a to your computer and use it in GitHub Desktop.
quarkus-registry-example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Configuration file | |
kafka.bootstrap.servers=localhost:9092 | |
mp.messaging.outgoing.price.connector=smallrye-kafka | |
mp.messaging.outgoing.price.client.id=price-producer | |
mp.messaging.outgoing.price.topic=prices | |
mp.messaging.outgoing.price.key.serializer=org.apache.kafka.common.serialization.StringSerializer | |
mp.messaging.outgoing.price.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer | |
mp.messaging.outgoing.price.apicurio.registry.url=http://localhost:8080 | |
mp.messaging.outgoing.price.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type": "record", | |
"name": "price", | |
"namespace": "com.redhat", | |
"fields": [ | |
{ | |
"name": "symbol", | |
"type": "string" | |
}, | |
{ | |
"name": "price", | |
"type": "string" | |
} | |
] | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.redhat; | |
import java.util.Random; | |
import java.util.concurrent.TimeUnit; | |
import javax.enterprise.context.ApplicationScoped; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
import io.reactivex.Flowable; | |
import io.smallrye.reactive.messaging.kafka.KafkaMessage; | |
@ApplicationScoped | |
public class RegistryProducer { | |
private Random random = new Random(); | |
private String[] symbols = new String[]{ "RHT", "IBM", "MSFT", "AMZN" }; | |
@Outgoing("price") | |
public Flowable<KafkaMessage<String, String>> generate() { | |
return Flowable.interval(1000, TimeUnit.MILLISECONDS) | |
.onBackpressureBuffer() | |
.map(tick -> { | |
return KafkaMessage.of( | |
symbols[random.nextInt(4)], | |
String.format("%.2f", random.nextDouble())); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-11-25 15:57:32,558 ERROR [io.sma.rea.mes.kaf.KafkaSink] (vert.x-worker-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@270dd072 was not sent to Kafka topic 'prices': javax.ws.rs.WebApplicationException: Unknown error, status code 404 | |
at org.jboss.resteasy.microprofile.client.DefaultResponseExceptionMapper.toThrowable(DefaultResponseExceptionMapper.java:31) | |
at org.jboss.resteasy.microprofile.client.ExceptionMapping$HandlerException.mapException(ExceptionMapping.java:53) | |
at org.jboss.resteasy.microprofile.client.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:166) | |
at com.sun.proxy.$Proxy40.getArtifactMetaDataByContent(Unknown Source) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at io.apicurio.registry.client.RegistryClient$ServiceProxy.invoke(RegistryClient.java:119) | |
at com.sun.proxy.$Proxy29.getArtifactMetaDataByContent(Unknown Source) | |
at io.apicurio.registry.utils.serde.strategy.FindBySchemaIdStrategy.findId(FindBySchemaIdStrategy.java:29) | |
at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:146) | |
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) | |
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:884) | |
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) | |
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:93) | |
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:316) | |
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) | |
at java.lang.Thread.run(Thread.java:748) | |
2019-11-25 15:57:32,560 ERROR [io.sma.rea.mes.kaf.KafkaSink] (vert.x-worker-thread-0) Unable to dispatch message to Kafka: java.util.concurrent.CompletionException: javax.ws.rs.WebApplicationException: Unknown error, status code 404 | |
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) | |
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) | |
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) | |
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) | |
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) | |
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) | |
at io.smallrye.reactive.messaging.kafka.KafkaSink.lambda$null$1(KafkaSink.java:119) | |
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$write$5(KafkaWriteStreamImpl.java:143) | |
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:131) | |
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:316) | |
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) | |
at java.lang.Thread.run(Thread.java:748) | |
Caused by: javax.ws.rs.WebApplicationException: Unknown error, status code 404 | |
at org.jboss.resteasy.microprofile.client.DefaultResponseExceptionMapper.toThrowable(DefaultResponseExceptionMapper.java:31) | |
at org.jboss.resteasy.microprofile.client.ExceptionMapping$HandlerException.mapException(ExceptionMapping.java:53) | |
at org.jboss.resteasy.microprofile.client.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:166) | |
at com.sun.proxy.$Proxy40.getArtifactMetaDataByContent(Unknown Source) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at io.apicurio.registry.client.RegistryClient$ServiceProxy.invoke(RegistryClient.java:119) | |
at com.sun.proxy.$Proxy29.getArtifactMetaDataByContent(Unknown Source) | |
at io.apicurio.registry.utils.serde.strategy.FindBySchemaIdStrategy.findId(FindBySchemaIdStrategy.java:29) | |
at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:146) | |
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60) | |
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:884) | |
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:846) | |
at io.vertx.kafka.client.producer.impl.KafkaWriteStreamImpl.lambda$send$4(KafkaWriteStreamImpl.java:93) | |
... 6 more |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment