Last active
July 5, 2019 17:50
-
-
Save Bill/a61df17bc905b837add31ed96053ad76 to your computer and use it in GitHub Desktop.
Test Project Reactor ConnectableFlux. State space exploration brought to you by `VirtualTimeSchedulerInaccurate` and changing `new Random(1)` to `new Random()`.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.thoughtpropulsion.reactrode.rxperiment; | |
import static com.thoughtpropulsion.reactrode.Functional.returning; | |
import static java.lang.System.currentTimeMillis; | |
import static java.lang.System.nanoTime; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import java.security.NoSuchAlgorithmException; | |
import java.security.SecureRandom; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
import com.thoughtpropulsion.reactrode.VirtualTimeSchedulerInaccurate; | |
import io.vavr.Tuple2; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.Disabled; | |
import org.junit.jupiter.api.Test; | |
import org.junit.jupiter.params.ParameterizedTest; | |
import org.junit.jupiter.params.provider.ValueSource; | |
import org.reactivestreams.Subscription; | |
import reactor.core.publisher.Flux; | |
import reactor.core.scheduler.Scheduler; | |
import reactor.core.scheduler.Schedulers; | |
import reactor.test.scheduler.VirtualTimeScheduler; | |
public class ConnectableFluxTest { | |
static class SplitterSubscriber { | |
final String name; | |
final AtomicBoolean done; | |
final AtomicBoolean needMore; | |
final Scheduler scheduler; | |
final AtomicReference<Subscription> subscription; | |
final List<Integer> seen; | |
SplitterSubscriber( | |
final String name, | |
final Flux<Integer> topic, | |
final Scheduler scheduler) { | |
this.name = name; | |
this.done = new AtomicBoolean(); | |
this.needMore = new AtomicBoolean(true); | |
this.scheduler = scheduler; | |
this.subscription = subscribe(name, topic, done, needMore); | |
this.seen = new ArrayList<>(); | |
} | |
private AtomicReference<Subscription> subscribe( | |
final String name, | |
final Flux<Integer> topic, | |
final AtomicBoolean done, | |
final AtomicBoolean needMore) { | |
return returning( | |
new AtomicReference<>(), | |
subscriptionRef -> | |
topic | |
.subscribeOn(scheduler) | |
.subscribe( | |
item -> { | |
log(name, "got item: " + item); | |
needMore.set(true); | |
seen.add(item); | |
}, | |
error -> { | |
log(name, "got error: " + error); | |
done.set(true); | |
}, | |
() -> { | |
log(name, "complete."); | |
done.set(true); | |
}, | |
subscription -> { | |
subscriptionRef.set(subscription); | |
})); | |
} | |
/* | |
Generate upstream demand as needed. | |
Call this periodically | |
*/ | |
void step() { | |
if (done.get()) | |
return; | |
if (needMore.compareAndSet(true,false)) { | |
log(name, "requesting 1"); | |
subscription.get().request(1); | |
} | |
} | |
} | |
@Disabled | |
@Test | |
void split2WithRealTimeMultiThreaded() throws InterruptedException { | |
final Scheduler scheduler = Schedulers.parallel(); | |
final Tuple2<SplitterSubscriber, SplitterSubscriber> | |
subscribers = doSplittingStuff( | |
scheduler, | |
100, | |
1000, | |
TimeUnit.MILLISECONDS); | |
while (!subscribers._1.done.get() || !subscribers._2.done.get()) { | |
Thread.sleep(10); | |
} | |
validate(subscribers); | |
} | |
@Disabled | |
@Test | |
void split2WithRealTimeSingleThreaded() throws InterruptedException { | |
final Scheduler scheduler = Schedulers.single(); | |
final Tuple2<SplitterSubscriber, SplitterSubscriber> | |
subscribers = doSplittingStuff(scheduler, 100, 1000, TimeUnit.MILLISECONDS); | |
while (!subscribers._1.done.get() || !subscribers._2.done.get()) { | |
Thread.sleep(10); | |
} | |
validate(subscribers); | |
} | |
private Random random; | |
private static boolean exploreStateSpace = false; | |
Random createRandom(final long trySeed) { | |
final long actualSeed; | |
if (exploreStateSpace) | |
actualSeed = nanoTime(); | |
else | |
actualSeed = trySeed; | |
System.out.println(String.format("using seed %,d", actualSeed)); | |
try { | |
return returning( | |
SecureRandom.getInstance("SHA1PRNG"), | |
prng -> prng.setSeed(actualSeed)); | |
} catch (NoSuchAlgorithmException e) { | |
throw new AssertionError(e); | |
} | |
} | |
@ParameterizedTest | |
@ValueSource(longs = {141_447_917_499_306L}) | |
void split2In9VirtualSeconds(final long seed) { | |
random = createRandom(seed); | |
final VirtualTimeScheduler scheduler = | |
VirtualTimeScheduler.set( | |
VirtualTimeSchedulerInaccurate.create( | |
random, 20, TimeUnit.MILLISECONDS)); | |
final Tuple2<SplitterSubscriber, SplitterSubscriber> | |
subscribers = doSplittingStuff( | |
scheduler, | |
100, | |
1000, | |
TimeUnit.MILLISECONDS); | |
scheduler.advanceTimeBy(Duration.ofSeconds(9)); | |
validate(subscribers); | |
} | |
@ParameterizedTest | |
@ValueSource(longs = {142_062_396_813_291L}) | |
void split2In135VirtualMinutes(final long seed) { | |
random = createRandom(seed); | |
final VirtualTimeScheduler scheduler = | |
VirtualTimeScheduler | |
.set(VirtualTimeSchedulerInaccurate.create( | |
random, 1, TimeUnit.MINUTES)); | |
final Tuple2<SplitterSubscriber, SplitterSubscriber> | |
subscribers = doSplittingStuff( | |
scheduler, | |
2, | |
15, | |
TimeUnit.MINUTES); | |
scheduler.advanceTimeBy(Duration.ofMinutes(135)); | |
validate(subscribers); | |
} | |
@Test | |
void split2In10VirtualSecondsIsDeterministic() { | |
split2In9VirtualSeconds(1); | |
final long afterFirstRun = random.nextLong(); | |
split2In9VirtualSeconds(1); | |
assertThat(random.nextLong()) | |
.as("random sequences match run to run") | |
.isEqualTo(afterFirstRun); | |
} | |
@Test | |
void split2In3VirtualHoursIsDeterministic() { | |
split2In135VirtualMinutes(1); | |
final long afterFirstRun = random.nextLong(); | |
split2In135VirtualMinutes(1); | |
assertThat(random.nextLong()) | |
.as("random sequences match run to run") | |
.isEqualTo(afterFirstRun); | |
} | |
private void validate( | |
final Tuple2<SplitterSubscriber, SplitterSubscriber> subscribers) { | |
/* | |
Have to turn the stream into an iterable to compare it due to AssertJ bug: | |
https://github.com/joel-costigliola/assertj-core/issues/1545 | |
*/ | |
final Iterable<Integer> asList = testSequence().collect(Collectors.toList()); | |
assertThat(subscribers._1.seen).isEqualTo(asList); | |
assertThat(subscribers._2.seen).isEqualTo(asList); | |
} | |
private Tuple2<SplitterSubscriber,SplitterSubscriber> doSplittingStuff( | |
final Scheduler scheduler, | |
final long fastFrequency, | |
final long slowFrequency, | |
final TimeUnit timeUnit) { | |
/* | |
!! LOOKIE HERE !! this is what we're testing | |
publish(2) creates the ConnectableFlux that will start producing to all subscribers | |
after the second subscription arrives | |
*/ | |
final Flux<Integer> topic = | |
Flux.fromStream(testSequence()) | |
.publish(3) // bounded demand | |
.autoConnect(2); // number of subscribers to wait for | |
final SplitterSubscriber fast = | |
new SplitterSubscriber("Fast!", topic, scheduler); | |
final SplitterSubscriber slow = | |
new SplitterSubscriber("slow ", topic, scheduler); | |
scheduler.schedulePeriodically( | |
() -> fast.step(), 0, fastFrequency, timeUnit); | |
scheduler.schedulePeriodically( | |
() -> slow.step(), 0, slowFrequency, timeUnit); | |
return new Tuple2<>(fast,slow); | |
} | |
private Stream<Integer> testSequence() { | |
return Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); | |
} | |
private static void log(final String name, final String s) { | |
System.out.println( | |
String.format("%s %s (thread: %s)", | |
name, s, Thread.currentThread().getId())); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment