Last active
March 24, 2017 10:56
-
-
Save schauder/3f63f9f68e4dbe27c962f27ed31c4ca5 to your computer and use it in GitHub Desktop.
Reactor Thingies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Do not use this: since the ChangeTrigger has state it can fail in all kinds of funny ways !!!!! | |
// similar to Flux::groupBy but closes the GroupedFlux whenever a different key value appears potentially opening another one for the same value later. | |
@Test | |
public void groupOnSwitch() { | |
StepVerifier | |
.create( | |
groupOnSwitch( | |
Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"), | |
s -> s.substring(0, 1)) | |
.flatMap(Flux::materialize) | |
.map(s -> s.isOnComplete() ? "WINDOW CLOSED" : s.get()) | |
) | |
.expectNext("one", "WINDOW CLOSED") | |
.expectNext("two", "twenty", "tissue", "WINDOW CLOSED") | |
.expectNext("berta", "blot", "WINDOW CLOSED") | |
.expectNext("thousand", "WINDOW CLOSED") | |
.verifyComplete(); | |
} | |
@Test | |
public void groupOnSwitchKeys() { | |
Flux<GroupedFlux<String, String>> fluxOfGroupedFluxes = groupOnSwitch( | |
Flux.just("one", "two", "twenty", "tissue", "berta", "blot", "thousand"), | |
s -> s.substring(0, 1)); | |
StepVerifier.create( | |
fluxOfGroupedFluxes.map(gf -> gf.key()) | |
) | |
.expectNext("o", "t", "b", "t") | |
.verifyComplete(); | |
} | |
private static <T, X> Flux<GroupedFlux<X, T>> groupOnSwitch(Flux<T> flux, Function<T, X> keyFunction) { | |
ChangeTrigger changeTrigger = new ChangeTrigger(); | |
Flux<GroupedFlux<T, T>> fluxOfGroupedFluxes = flux.windowUntil(l -> changeTrigger.test(keyFunction.apply(l)), true); | |
return fluxOfGroupedFluxes.flatMap(gf -> gf.groupBy(t -> keyFunction.apply(gf.key()))); | |
} | |
private static class ChangeTrigger<T> { | |
T last = null; | |
boolean test(T value) { | |
boolean startNew = !Objects.equals(last, value); | |
last = value; | |
System.out.println(String.format("%s, %s", value, startNew)); | |
return startNew; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment