Skip to content

Instantly share code, notes, and snippets.

@chrisxaustin
Created January 20, 2022 15:16
Show Gist options
  • Save chrisxaustin/ce24844e19871a07b78c25f198e36990 to your computer and use it in GitHub Desktop.
Save chrisxaustin/ce24844e19871a07b78c25f198e36990 to your computer and use it in GitHub Desktop.
Batching with Flux
package example;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.stream.IntStream;
@Slf4j
public class WithReactor {
public static void main(String[] argv) throws Exception {
var s = IntStream.range(0, 100_000)
.mapToObj(x -> Integer.toString(x))
;
var processed = new LongAdder();
var start = System.currentTimeMillis();
Flux.fromStream(s)
.buffer(1000)
.parallel(10)
.runOn(Schedulers.parallel())
// .log()
.doOnNext(x -> {
try {
TimeUnit.SECONDS.sleep(1);
}
catch (InterruptedException e) {
e.printStackTrace();
}
processed.increment();
log.info("{}: {}", Thread.currentThread().getName(), x.size());
})
.sequential()
.blockLast()
;
log.info("processed {} batches in {}", processed.sum(), (System.currentTimeMillis() - start) / 1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment