Skip to content

Instantly share code, notes, and snippets.

View schroedermatt's full-sized avatar

Matt Schroeder schroedermatt

View GitHub Profile
/**
* Custom subscriber that will be notified on all UnleashEvents.
* The extended Log4JSubscriber will log out the events that are not
* handled in this extension.
**/
public class KafkaUnleashSubscriber extends Log4JSubscriber {
@Override
public void on(@NotNull UnleashEvent event) {
// many events come through here, we only care about responses
if (event instance of FeatureToggleResponse) {
// simplified consumer
try {
while (true) {
// fetch latest flag values and apply accordingly
applyFeatureFlags(consumer);
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// logic
}
/**
* Configure the client that connects to Unleash if unleash.enabled is "true"
**/
@Bean
@ConditionalOnProperty(value = "unleash.enabled", havingValue = "true")
Unleash defaultUnleash() {
UnleashConfig config = UnleashConfig.builder()
.unleashAPI("<http://unleash-api.com>")
.instanceId("asdf-123")
.appName("unleash-demo")
// two topics that this consumer cares about
List assignedTopics = ["demo.topic.one", "demo.topic.two"]
// all of the feature flags
List flags = [
"topic_demo.topic.one",
"ui-feature-flag",
"topic_important.topic"
]
// assume consumer is subscribed to demo.topic.name && another.topic.name
String pausedTopic = "demo.topic.name";
//
// PAUSE
//
// filter out the assigned partitions for the topic being paused
Set<TopicPartition> partitionsToPause = consumer.assignment()
// get all assigned partitions for the consumer
Set<TopicPartition> assignedPartitions = consumer.assignment();
// pause all assigned partitions
Set<TopicPartition> pausedPartitions = consumer.pause(assignedPartitions);
// resume all paused partitions
consumer.resume(consumer.paused());
// you could also resume the consumer.assignment() since everything is paused
@schroedermatt
schroedermatt / ExampleTest.java
Created November 11, 2021 17:14
ExampleTest using custom QuarkusEmbeddedKafkaTest annotation
@QuarkusEmbeddedKafkaTest
public class ExampleTest {
...
}
@schroedermatt
schroedermatt / QuarkusEmbeddedKafkaTest.java
Created November 11, 2021 17:14
Custom annotation for QuarkusEmbeddedKafkaTest
@QuarkusTest
@QuarkusTestResource(EmbeddedKafkaTestResource.class)
@Stereotype
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface QuarkusEmbeddedKafkaTest {}
@schroedermatt
schroedermatt / EmbeddedKafkaTestResource.java
Created November 11, 2021 17:13
EmbeddedKafkaTestResource for Quarkus tests
import org.springframework.kafka.test.EmbeddedKafkaBroker;
public class EmbeddedKafkaTestResource implements QuarkusTestResourceLifecycleManager {
public EmbeddedKafkaBroker embeddedBroker;
/**
* @return A map of system properties that should be set for the running test
*/
@Override
@schroedermatt
schroedermatt / ExampleTest.java
Created November 11, 2021 17:11
ExampleTest using QuarkusContainerKafkaTest annotation
@QuarkusContainerKafkaTest
public class ExampleTest {
...
}