Last active
July 27, 2018 11:44
-
-
Save bisignam/b73b3550d6a2393451cd14a096358b7f to your computer and use it in GitHub Desktop.
[SPRING-REACTOR] Test class that summarizes how to test infinite Flux(es) with StepVerifier
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.exm.generisk.notifications; | |
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; | |
import java.time.Duration; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.List; | |
import org.junit.Before; | |
import org.junit.Test; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.FluxSink; | |
import reactor.test.StepVerifier; | |
import reactor.test.StepVerifier.Step; | |
public class UnderstandStepVerifier { | |
private final FluxNotifier<String> myFluxNotifier = new FluxNotifier<>(); | |
private String endSignal = "END"; | |
@Before | |
public void setUp() { | |
myFluxNotifier.setEndSignal(endSignal); | |
StepVerifier.setDefaultTimeout(Duration.ofSeconds(1)); | |
} | |
@Test | |
public void verifyOnlyTwoElementsInInfiniteFlux() { | |
Flux<String> myFlux = Flux.create(sink -> { | |
sink.next("foo"); | |
sink.next("bar"); | |
myFluxNotifier.setFluxSink(sink); | |
}); | |
verifiyReceivedExactly(myFlux, Arrays.asList("foo", "bar"), myFluxNotifier); | |
} | |
@Test | |
public void verifyInterceptsUnexpectedElementInInfiniteFlux() { | |
Flux<String> myFlux = Flux.create(sink -> { | |
sink.next("foo"); | |
sink.next("bar"); | |
sink.next("ciao"); | |
myFluxNotifier.setFluxSink(sink); | |
}); | |
assertThatExceptionOfType(AssertionError.class) | |
.isThrownBy( | |
() -> verifiyReceivedExactly(myFlux, Arrays.asList("foo", "bar"), myFluxNotifier)) | |
.withMessageStartingWith( | |
"expectation \"Expect end signal\" failed" | |
); | |
} | |
@Test | |
public void verifyOneElementInInfiniteFlux() { | |
Flux<String> myFlux = Flux.create(sink -> { | |
sink.next("foo"); | |
myFluxNotifier.setFluxSink(sink); | |
}); | |
verifiyReceivedExactly(myFlux, Collections.singletonList("foo"), myFluxNotifier); | |
} | |
@Test | |
public void verifyEpectingOneElementInInfiniteFluxButReceivingEmptyFlux() { | |
Flux<String> myFlux = Flux.create(myFluxNotifier::setFluxSink); | |
assertThatExceptionOfType(AssertionError.class) | |
.isThrownBy( | |
() -> verifiyReceivedExactly(myFlux, Collections.singletonList("foo"), myFluxNotifier)) | |
.withMessageStartingWith( | |
"expectation \"expectNextSequence\" failed (expected : onNext(foo); actual: " | |
+ endSignal + "; iterable: [foo])" | |
); | |
} | |
@Test | |
public void verifyEmptyInfiniteFlux() { | |
Flux<String> myFlux = Flux.create(myFluxNotifier::setFluxSink); | |
verifiyReceivedExactly(myFlux, Collections.emptyList(), myFluxNotifier); | |
} | |
@Test | |
public void verifyExpectingEmptyInfiniteFluxButInterceptingNotExpectedElement() { | |
Flux<String> myFlux = Flux.create(sink -> { | |
sink.next("foo"); | |
myFluxNotifier.setFluxSink(sink); | |
}); | |
assertThatExceptionOfType(AssertionError.class) | |
.isThrownBy( | |
() -> verifiyReceivedExactly(myFlux, Collections.emptyList(), myFluxNotifier)) | |
.withMessageStartingWith( | |
"expectation \"Expect end signal\" failed" | |
); | |
} | |
private <T> void verifiyReceivedExactly(Flux<T> myFlux, List<T> elements, | |
FluxNotifier<T> fluxNotifier) { | |
Step<T> step = StepVerifier.create(myFlux.log()) | |
.expectSubscription() | |
.then(fluxNotifier::closeFlux); | |
if (!elements.isEmpty()) { | |
step = step.expectNextSequence(elements); | |
} | |
step.expectNext(fluxNotifier.getEndSignal()) | |
.as("Expect end signal") | |
.verifyComplete(); | |
} | |
private static class FluxNotifier<T> { | |
private FluxSink<T> fluxSink; | |
private T endSignal; | |
public void setFluxSink(FluxSink<T> fluxSink) { | |
this.fluxSink = fluxSink; | |
} | |
public T getEndSignal() { | |
return endSignal; | |
} | |
public void setEndSignal(T endSignal) { | |
this.endSignal = endSignal; | |
} | |
void closeFlux() { | |
fluxSink.next(endSignal); | |
fluxSink.complete(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment