Skip to content

Instantly share code, notes, and snippets.

@bisignam
Last active July 27, 2018 11:44
Show Gist options
  • Save bisignam/b73b3550d6a2393451cd14a096358b7f to your computer and use it in GitHub Desktop.
Save bisignam/b73b3550d6a2393451cd14a096358b7f to your computer and use it in GitHub Desktop.
[SPRING-REACTOR] Test class that summarizes how to test infinite Flux(es) with StepVerifier
package ch.exm.generisk.notifications;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.test.StepVerifier;
import reactor.test.StepVerifier.Step;
public class UnderstandStepVerifier {
private final FluxNotifier<String> myFluxNotifier = new FluxNotifier<>();
private String endSignal = "END";
@Before
public void setUp() {
myFluxNotifier.setEndSignal(endSignal);
StepVerifier.setDefaultTimeout(Duration.ofSeconds(1));
}
@Test
public void verifyOnlyTwoElementsInInfiniteFlux() {
Flux<String> myFlux = Flux.create(sink -> {
sink.next("foo");
sink.next("bar");
myFluxNotifier.setFluxSink(sink);
});
verifiyReceivedExactly(myFlux, Arrays.asList("foo", "bar"), myFluxNotifier);
}
@Test
public void verifyInterceptsUnexpectedElementInInfiniteFlux() {
Flux<String> myFlux = Flux.create(sink -> {
sink.next("foo");
sink.next("bar");
sink.next("ciao");
myFluxNotifier.setFluxSink(sink);
});
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(
() -> verifiyReceivedExactly(myFlux, Arrays.asList("foo", "bar"), myFluxNotifier))
.withMessageStartingWith(
"expectation \"Expect end signal\" failed"
);
}
@Test
public void verifyOneElementInInfiniteFlux() {
Flux<String> myFlux = Flux.create(sink -> {
sink.next("foo");
myFluxNotifier.setFluxSink(sink);
});
verifiyReceivedExactly(myFlux, Collections.singletonList("foo"), myFluxNotifier);
}
@Test
public void verifyEpectingOneElementInInfiniteFluxButReceivingEmptyFlux() {
Flux<String> myFlux = Flux.create(myFluxNotifier::setFluxSink);
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(
() -> verifiyReceivedExactly(myFlux, Collections.singletonList("foo"), myFluxNotifier))
.withMessageStartingWith(
"expectation \"expectNextSequence\" failed (expected : onNext(foo); actual: "
+ endSignal + "; iterable: [foo])"
);
}
@Test
public void verifyEmptyInfiniteFlux() {
Flux<String> myFlux = Flux.create(myFluxNotifier::setFluxSink);
verifiyReceivedExactly(myFlux, Collections.emptyList(), myFluxNotifier);
}
@Test
public void verifyExpectingEmptyInfiniteFluxButInterceptingNotExpectedElement() {
Flux<String> myFlux = Flux.create(sink -> {
sink.next("foo");
myFluxNotifier.setFluxSink(sink);
});
assertThatExceptionOfType(AssertionError.class)
.isThrownBy(
() -> verifiyReceivedExactly(myFlux, Collections.emptyList(), myFluxNotifier))
.withMessageStartingWith(
"expectation \"Expect end signal\" failed"
);
}
private <T> void verifiyReceivedExactly(Flux<T> myFlux, List<T> elements,
FluxNotifier<T> fluxNotifier) {
Step<T> step = StepVerifier.create(myFlux.log())
.expectSubscription()
.then(fluxNotifier::closeFlux);
if (!elements.isEmpty()) {
step = step.expectNextSequence(elements);
}
step.expectNext(fluxNotifier.getEndSignal())
.as("Expect end signal")
.verifyComplete();
}
private static class FluxNotifier<T> {
private FluxSink<T> fluxSink;
private T endSignal;
public void setFluxSink(FluxSink<T> fluxSink) {
this.fluxSink = fluxSink;
}
public T getEndSignal() {
return endSignal;
}
public void setEndSignal(T endSignal) {
this.endSignal = endSignal;
}
void closeFlux() {
fluxSink.next(endSignal);
fluxSink.complete();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment