Last active
March 27, 2023 09:05
-
-
Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//usr/bin/env jbang "$0" "$@" ; exit $? | |
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 | |
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 | |
//DEPS org.testcontainers:kafka:1.15.0-rc2 | |
//JAVAC_OPTIONS -parameters | |
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager | |
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer | |
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest | |
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=dead-letter-queue | |
//JAVA_OPTIONS -Dmp.messaging.incoming.dead-letter-topic-movies.connector=smallrye-kafka -Dmp.messaging.incoming.dead-letter-topic-movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.dead-letter-topic-movies.auto.offset.reset=earliest | |
package foo; | |
import io.quarkus.runtime.Quarkus; | |
import io.quarkus.runtime.annotations.QuarkusMain; | |
import io.smallrye.mutiny.Multi; | |
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; | |
import org.eclipse.microprofile.reactive.messaging.Incoming; | |
import org.eclipse.microprofile.reactive.messaging.Message; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
import org.jboss.logging.Logger; | |
import org.testcontainers.containers.KafkaContainer; | |
import org.testcontainers.utility.DockerImageName; | |
import javax.enterprise.context.ApplicationScoped; | |
import java.util.concurrent.CompletionStage; | |
public class KafkaDeadLetterTopic { | |
static final Logger LOGGER = Logger.getLogger("Kafka-Dead-Letter-Topic"); | |
@QuarkusMain | |
static class Main { | |
public static void main(String... args) { | |
LOGGER.info("Starting Kafka..."); | |
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); | |
kafka.start(); | |
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); | |
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); | |
Quarkus.run(args); | |
} | |
} | |
@ApplicationScoped | |
public static class MovieProcessor { | |
@Incoming("movies") | |
public void consume(String movie) { | |
LOGGER.infof("Receiving movie %s", movie); | |
if (movie.contains("'")) { | |
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); | |
} | |
if (movie.contains(",")) { | |
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); | |
} | |
} | |
} | |
@ApplicationScoped | |
public static class MovieWriter { | |
@Outgoing("movies-out") | |
public Multi<String> generate() { | |
return Multi.createFrom().items( | |
"The Shawshank Redemption", | |
"The Godfather", | |
"The Godfather: Part II", | |
"The Dark Knight", | |
"12 Angry Men", | |
"Schindler's List", | |
"The Lord of the Rings: The Return of the King", | |
"Pulp Fiction", | |
"The Good, the Bad and the Ugly", | |
"The Lord of the Rings: The Fellowship of the Ring" | |
); | |
} | |
} | |
@ApplicationScoped | |
public static class DeadLetterTopicReader { | |
@SuppressWarnings("unchecked") | |
@Incoming("dead-letter-topic-movies") | |
public CompletionStage<Void> dead(Message<String> rejected) { | |
IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class) | |
.orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka")); | |
String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value()); | |
LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason); | |
return rejected.ack(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//usr/bin/env jbang "$0" "$@" ; exit $? | |
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 | |
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 | |
//DEPS org.testcontainers:kafka:1.15.0-rc2 | |
//JAVAC_OPTIONS -parameters | |
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager | |
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer | |
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest | |
package foo; | |
import io.quarkus.runtime.Quarkus; | |
import io.quarkus.runtime.annotations.QuarkusMain; | |
import io.smallrye.mutiny.Multi; | |
import org.eclipse.microprofile.reactive.messaging.Incoming; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
import org.jboss.logging.Logger; | |
import org.testcontainers.containers.KafkaContainer; | |
import org.testcontainers.utility.DockerImageName; | |
import javax.enterprise.context.ApplicationScoped; | |
public class KafkaFailFast { | |
static final Logger LOGGER = Logger.getLogger("Kafka-Fail-Fast"); | |
@QuarkusMain | |
static class Main { | |
public static void main(String... args) { | |
LOGGER.info("Starting Kafka..."); | |
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); | |
kafka.start(); | |
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); | |
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); | |
Quarkus.run(args); | |
} | |
} | |
@ApplicationScoped | |
public static class MovieProcessor { | |
@Incoming("movies") | |
public void consume(String movie) { | |
LOGGER.infof("Receiving movie %s", movie); | |
if (movie.contains("'")) { | |
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); | |
} | |
if (movie.contains(",")) { | |
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); | |
} | |
} | |
} | |
@ApplicationScoped | |
public static class MovieWriter { | |
@Outgoing("movies-out") | |
public Multi<String> generate() { | |
return Multi.createFrom().items( | |
"The Shawshank Redemption", | |
"The Godfather", | |
"The Godfather: Part II", | |
"The Dark Knight", | |
"12 Angry Men", | |
"Schindler's List", | |
"The Lord of the Rings: The Return of the King", | |
"Pulp Fiction", | |
"The Good, the Bad and the Ugly", | |
"The Lord of the Rings: The Fellowship of the Ring" | |
); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//usr/bin/env jbang "$0" "$@" ; exit $? | |
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1 | |
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1 | |
//DEPS org.testcontainers:kafka:1.15.0-rc2 | |
//JAVAC_OPTIONS -parameters | |
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager | |
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer | |
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest | |
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=ignore | |
package foo; | |
import io.quarkus.runtime.Quarkus; | |
import io.quarkus.runtime.annotations.QuarkusMain; | |
import io.smallrye.mutiny.Multi; | |
import org.eclipse.microprofile.reactive.messaging.Incoming; | |
import org.eclipse.microprofile.reactive.messaging.Outgoing; | |
import org.jboss.logging.Logger; | |
import org.testcontainers.containers.KafkaContainer; | |
import org.testcontainers.utility.DockerImageName; | |
import javax.enterprise.context.ApplicationScoped; | |
public class KafkaIgnoreFailure { | |
static final Logger LOGGER = Logger.getLogger("Kafka-Ignore"); | |
@QuarkusMain | |
static class Main { | |
public static void main(String... args) { | |
LOGGER.info("Starting Kafka..."); | |
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1")); | |
kafka.start(); | |
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers()); | |
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers()); | |
Quarkus.run(args); | |
} | |
} | |
@ApplicationScoped | |
public static class MovieProcessor { | |
@Incoming("movies") | |
public void consume(String movie) { | |
LOGGER.infof("Receiving movie %s", movie); | |
if (movie.contains("'")) { | |
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie); | |
} | |
if (movie.contains(",")) { | |
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie); | |
} | |
} | |
} | |
@ApplicationScoped | |
public static class MovieWriter { | |
@Outgoing("movies-out") | |
public Multi<String> generate() { | |
return Multi.createFrom().items( | |
"The Shawshank Redemption", | |
"The Godfather", | |
"The Godfather: Part II", | |
"The Dark Knight", | |
"12 Angry Men", | |
"Schindler's List", | |
"The Lord of the Rings: The Return of the King", | |
"Pulp Fiction", | |
"The Good, the Bad and the Ugly", | |
"The Lord of the Rings: The Fellowship of the Ring" | |
); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment