Skip to content

Instantly share code, notes, and snippets.

public static void main(String[] args) throws InterruptedException {
Flux.fromIterable(List.of(1001, 1002, 2001, 3303, 1003, 2002))
.flatMap(EventsBuilder::get)
.subscribe(i -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " on next " + i), System.err::println, () -> System.out.println("complete"));
Thread.sleep(10000);
}
static int count = 0;
private static Mono<String> get(int i) {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.fromIterable(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
Flux<Integer> cache = integerFlux
.doOnNext(i -> System.out.println("on next " + i))
.publish()
.refCount(2);
Flux<Integer> even = cache.filter(i -> i % 2 == 0);
Flux<Integer> odd = cache.filter(i -> i % 2 != 0);
even.subscribe(i -> System.out.println("on next even " + i), System.err::println, () -> System.out.println("complete even"));
def contextsField = ReflectionUtils.findField(SpringClientFactory, "contexts")
contextsField.setAccessible(true)
def contexts = ReflectionUtils.getField(contextsField, springClientFactory)
contexts.clear()
def staticServerList = new StaticServerList(new Server("localhost", wireMockPort))
// add services here
["service-name"].each { name ->
def balancer = springClientFactory.getLoadBalancer(name)
balancer.setServerListImpl(staticServerList)
@cherniag
cherniag / gist:3d0a2b8e677159a49ad44681a951e106
Created April 6, 2021 14:01
kafka reactive error handle
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter#sendMessageIfAny
org.springframework.messaging.core.GenericMessagingTemplate#doSend(org.springframework.messaging.MessageChannel, org.springframework.messaging.Message<?>, long)
org.springframework.integration.channel.AbstractMessageChannel#send(org.springframework.messaging.Message<?>)
org.springframework.integration.dispatcher.UnicastingDispatcher#doDispatch
org.springframework.integration.util.IntegrationReactiveUtils#adaptSubscribableChannelToPublisher
=====
org.springframework.integration.endpoint.ReactiveStreamsConsumer#doStart
@cherniag
cherniag / lombok.config
Created March 25, 2021 15:55
Spring @value + lombok @requiredargsconstructor + final field
lombok.copyableAnnotations+=org.springframework.beans.factory.annotation.Value
@cherniag
cherniag / gist:80dc9e4f9212161085ad43488ce351c9
Created December 8, 2020 17:39
entry point to spring kafka listener
org.springframework.kafka.listener.KafkaMessageListenerContainer.ListenerConsumer#pollAndInvoke
@cherniag
cherniag / gist:2d7e66d4ee9c2da420addaa362eb6801
Last active December 4, 2020 12:50
spring cloud kafka consumer
org.springframework.kafka.core.DefaultKafkaConsumerFactory#createKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object>)
org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendJoinGroupRequest
#kafka metrics
io.micrometer.core.instrument.binder.kafka.KafkaMetrics
org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfiguration
# deserialization
org.springframework.messaging.converter.MappingJackson2MessageConverter#convertFromInternal
@cherniag
cherniag / CacheContext
Created December 2, 2020 14:34
loading cache with async refresh
package com.test.service.configuration;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@cherniag
cherniag / gist:cc58a865e1c4fe1092c41ffe2d3cd46e
Created October 29, 2019 13:31
Jackson @JsonUnwrapped with Lombok @builder deserialization
@Value
@Builder
public class Parent {
long oneField;
@JsonProperty(access = JsonProperty.Access.READ_ONLY) // does magic - jackson will not set
@NonNull
@JsonUnwrapped
Child child;
@cherniag
cherniag / gist:7e20d791a2bc22b382ef3c6573fd3ce4
Last active December 1, 2020 13:35
Load balanced Spring WebClient + setting Netty read and connect timeouts
// https://projectreactor.io/docs/netty/snapshot/reference/index.html
@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder(ExchangeStrategies exchangeStrategies, ClientHttpConnector clientHttpConnector) {
return WebClient.builder()
.clientConnector(clientHttpConnector)
// .filter(new LoadBalancerExchangeFilterFunction(loadBalancerClient)) // in case without @LoadBalanced annotation, loadBalancerClient comes from context
.exchangeStrategies(exchangeStrategies);
}