Skip to content

Instantly share code, notes, and snippets.

@cescoffier
Last active March 27, 2023 09:05
Show Gist options
  • Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.
Save cescoffier/23ca7b2bcc8c49cee3db29b3f2b59e4a to your computer and use it in GitHub Desktop.
//usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
//DEPS org.testcontainers:kafka:1.15.0-rc2
//JAVAC_OPTIONS -parameters
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=dead-letter-queue
//JAVA_OPTIONS -Dmp.messaging.incoming.dead-letter-topic-movies.connector=smallrye-kafka -Dmp.messaging.incoming.dead-letter-topic-movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.dead-letter-topic-movies.auto.offset.reset=earliest
package foo;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletionStage;
public class KafkaDeadLetterTopic {
static final Logger LOGGER = Logger.getLogger("Kafka-Dead-Letter-Topic");
@QuarkusMain
static class Main {
public static void main(String... args) {
LOGGER.info("Starting Kafka...");
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
kafka.start();
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
Quarkus.run(args);
}
}
@ApplicationScoped
public static class MovieProcessor {
@Incoming("movies")
public void consume(String movie) {
LOGGER.infof("Receiving movie %s", movie);
if (movie.contains("'")) {
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
}
if (movie.contains(",")) {
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
}
}
}
@ApplicationScoped
public static class MovieWriter {
@Outgoing("movies-out")
public Multi<String> generate() {
return Multi.createFrom().items(
"The Shawshank Redemption",
"The Godfather",
"The Godfather: Part II",
"The Dark Knight",
"12 Angry Men",
"Schindler's List",
"The Lord of the Rings: The Return of the King",
"Pulp Fiction",
"The Good, the Bad and the Ugly",
"The Lord of the Rings: The Fellowship of the Ring"
);
}
}
@ApplicationScoped
public static class DeadLetterTopicReader {
@SuppressWarnings("unchecked")
@Incoming("dead-letter-topic-movies")
public CompletionStage<Void> dead(Message<String> rejected) {
IncomingKafkaRecordMetadata<String, String> metadata = rejected.getMetadata(IncomingKafkaRecordMetadata.class)
.orElseThrow(() -> new IllegalArgumentException("Expected a message coming from Kafka"));
String reason = new String(metadata.getHeaders().lastHeader("dead-letter-reason").value());
LOGGER.infof("The message '%s' has been rejected and sent to the DLT. The reason is: '%s'.", rejected.getPayload(), reason);
return rejected.ack();
}
}
}
//usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
//DEPS org.testcontainers:kafka:1.15.0-rc2
//JAVAC_OPTIONS -parameters
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest
package foo;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import javax.enterprise.context.ApplicationScoped;
public class KafkaFailFast {
static final Logger LOGGER = Logger.getLogger("Kafka-Fail-Fast");
@QuarkusMain
static class Main {
public static void main(String... args) {
LOGGER.info("Starting Kafka...");
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
kafka.start();
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
Quarkus.run(args);
}
}
@ApplicationScoped
public static class MovieProcessor {
@Incoming("movies")
public void consume(String movie) {
LOGGER.infof("Receiving movie %s", movie);
if (movie.contains("'")) {
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
}
if (movie.contains(",")) {
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
}
}
}
@ApplicationScoped
public static class MovieWriter {
@Outgoing("movies-out")
public Multi<String> generate() {
return Multi.createFrom().items(
"The Shawshank Redemption",
"The Godfather",
"The Godfather: Part II",
"The Dark Knight",
"12 Angry Men",
"Schindler's List",
"The Lord of the Rings: The Return of the King",
"Pulp Fiction",
"The Good, the Bad and the Ugly",
"The Lord of the Rings: The Fellowship of the Ring"
);
}
}
}
//usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS io.quarkus:quarkus-smallrye-reactive-messaging-kafka:1.9.0.CR1
//DEPS io.quarkus:quarkus-smallrye-health:1.9.0.CR1
//DEPS org.testcontainers:kafka:1.15.0-rc2
//JAVAC_OPTIONS -parameters
//JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
//JAVA_OPTIONS -Dmp.messaging.outgoing.movies-out.connector=smallrye-kafka -Dmp.messaging.outgoing.movies-out.topic=movies -Dmp.messaging.outgoing.movies-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.connector=smallrye-kafka -Dmp.messaging.incoming.movies.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -Dmp.messaging.incoming.movies.auto.offset.reset=earliest
//JAVA_OPTIONS -Dmp.messaging.incoming.movies.failure-strategy=ignore
package foo;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.annotations.QuarkusMain;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import javax.enterprise.context.ApplicationScoped;
public class KafkaIgnoreFailure {
static final Logger LOGGER = Logger.getLogger("Kafka-Ignore");
@QuarkusMain
static class Main {
public static void main(String... args) {
LOGGER.info("Starting Kafka...");
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));
kafka.start();
LOGGER.infof("Kafka started: %s", kafka.getBootstrapServers());
System.setProperty("kafka.bootstrap.servers", kafka.getBootstrapServers());
Quarkus.run(args);
}
}
@ApplicationScoped
public static class MovieProcessor {
@Incoming("movies")
public void consume(String movie) {
LOGGER.infof("Receiving movie %s", movie);
if (movie.contains("'")) {
throw new IllegalArgumentException("I don't like movie with ' in their title: " + movie);
}
if (movie.contains(",")) {
throw new IllegalArgumentException("I don't like movie with , in their title: " + movie);
}
}
}
@ApplicationScoped
public static class MovieWriter {
@Outgoing("movies-out")
public Multi<String> generate() {
return Multi.createFrom().items(
"The Shawshank Redemption",
"The Godfather",
"The Godfather: Part II",
"The Dark Knight",
"12 Angry Men",
"Schindler's List",
"The Lord of the Rings: The Return of the King",
"Pulp Fiction",
"The Good, the Bad and the Ugly",
"The Lord of the Rings: The Fellowship of the Ring"
);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment