Last active
November 5, 2019 12:44
-
-
Save simbo1905/21640998cd27369e44be8a282dc9f13f to your computer and use it in GitHub Desktop.
out of order processing of a Flux of input using Reactor 0.9.1.RELEASE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import reactor.core.publisher.Mono; | |
public class BlockingJournal { | |
private static String blockingWrite(String in){ | |
try { | |
// fakes blocking for disk write | |
Thread.sleep(5L); | |
System.out.println("journal wrote: "+in+" on "+Thread.currentThread().getName()); | |
} catch (Exception e){ | |
throw new RuntimeException(e); | |
} | |
return in; | |
} | |
public static Mono<String> blockingMethodSingleThread(final String in) { | |
return Mono.fromSupplier(() -> blockingWrite(in)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import reactor.core.publisher.Mono; | |
import java.util.Random; | |
public class BlockingRemoteCall { | |
private final static Random r = new Random(); | |
static private String blockingWebService(final String in) { | |
try { | |
// fakes blocking for up to a second | |
Thread.sleep((long) (1000 * r.nextFloat())); | |
System.out.println("webserver returned: "+in+" on "+Thread.currentThread().getName()); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
return in; | |
} | |
public static Mono<String> blockingMethodParallelThread(final String in) { | |
return Mono.fromSupplier(() -> blockingWebService(in)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.junit.Test; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.ParallelFlux; | |
import reactor.core.scheduler.Schedulers; | |
public class TestFluxes { | |
@Test | |
public void testFluxParallelProcess() throws Exception { | |
// Solution by Michael Berry https://stackoverflow.com/a/58709188/329496 Thanks! | |
ParallelFlux<String> flux = Flux.range(1, 10).map(i -> i.toString()).parallel().runOn(Schedulers.elastic()); | |
ParallelFlux<String> pipeline = flux.flatMap(s -> { | |
Mono<String> async = BlockingRemoteCall.blockingMethodParallelThread(s); | |
String r1 = async.block(); | |
return BlockingJournal.blockingMethodSingleThread(r1); | |
}); | |
pipeline.sequential().doOnNext(System.out::println).blockLast(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment