Skip to content

Instantly share code, notes, and snippets.

@nblagoev
Last active April 24, 2018 20:27
Show Gist options
  • Save nblagoev/1b347263cff7d6ecbd38053c265cb0d1 to your computer and use it in GitHub Desktop.
Save nblagoev/1b347263cff7d6ecbd38053c265cb0d1 to your computer and use it in GitHub Desktop.
import org.reactivestreams.Subscription;
import java.time.LocalTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.Loggers;
class Scratch {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 250)
.flatMap(
i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.map(j -> highCpuProcess(j)), 8
)
.doAfterTerminate(latch::countDown)
.subscribe(new SubscriberWithBackPressure<>(16, a -> { }));
try {
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int highCpuProcess(int i) {
System.out.println(
LocalTime.now() + " - HIGH CPU - " + i + " on thread: " + Thread.currentThread()
.getName());
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
class SubscriberWithBackPressure<T> extends BaseSubscriber<T> {
private final int maxRequest;
private final Consumer<T> consumer;
public SubscriberWithBackPressure(int maxRequest, Consumer<T> consumer) {
this.maxRequest = maxRequest;
this.consumer = consumer;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(maxRequest);
}
@Override
protected void hookOnNext(T value) {
if (consumer != null) {
System.out.println(
LocalTime.now() + " - Subscriber received - " + value + " on thread: " + Thread.currentThread()
.getName());
this.consumer.accept(value);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
request(maxRequest);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment