Last active
June 25, 2018 01:55
-
-
Save quelgar/973642a981d9e24b682ac9c8f104e5a7 to your computer and use it in GitHub Desktop.
Testing reliably subscribing to a Monix ConnectableObservable before connecting.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package au.com.str.stellar | |
import cats.implicits._ | |
import monix.eval.{Task, TaskApp} | |
import monix.reactive.Observable | |
import scala.concurrent.duration._ | |
object Test extends TaskApp { | |
override def runc: Task[Unit] = { | |
val obs = Observable.range(0L, 5L) | |
.doOnSubscribe(() => println("SOURCE Subscribed")) | |
.doOnTerminate(x => println(s"SOURCE term: $x")) | |
.doOnSubscriptionCancel(() => println("SOURCE Subscription canceled")) | |
/* | |
This does not work, there is no output from A and B: | |
A running | |
connecting | |
SOURCE Subscribed | |
SOURCE term: None | |
SHARED Subscribed | |
SHARED term: None | |
SHARED Subscribed | |
SHARED term: None | |
Joined | |
Start test done | |
***************** | |
When `.start` returns, the subscription to the shared observable has not yet happened, so when we connect | |
the values are emitted without any downstream subscribers. | |
*/ | |
val startTest = for { | |
_ <- Task.eval("*** Start Test") | |
con <- Task.deferAction { | |
implicit scheduler => | |
Task.eval(obs.publish) | |
} | |
shared = con | |
.doOnSubscribe(() => println("SHARED Subscribed")) | |
.doOnTerminate(x => println(s"SHARED term: $x")) | |
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled")) | |
a <- shared.foreachL(i => println(s"A: $i")).start | |
_ <- Task.eval(println("A running")) | |
b <- shared.foreachL(i => println(s"B: $i")).start | |
_ <- Task.eval(println("connecting")) | |
_ <- Task.eval(con.connect()) | |
_ <- a.join | |
_ <- b.join | |
_ <- Task.eval(println("Joined")) | |
_ <- Task.eval(println("Start test done\n*****************\n")) | |
} yield () | |
/* | |
Change `.start` to `.fork`. | |
This seems to work, but it may be a race condition. There doesn't seem to be any guarantee that the subscription | |
has occurred by the time `.fork` has returned. That is, it seems the subscription happens asynchronously. | |
SHARED Subscribed | |
A running | |
SHARED Subscribed | |
connecting | |
SOURCE Subscribed | |
A: 0 | |
B: 0 | |
A: 1 | |
B: 1 | |
A: 2 | |
B: 2 | |
A: 3 | |
B: 3 | |
A: 4 | |
B: 4 | |
SOURCE term: None | |
SHARED term: None | |
SHARED term: None | |
Joined | |
Fork test done | |
***************** | |
*/ | |
val forkTest = for { | |
_ <- Task.eval("*** Fork Effect Test") | |
con <- Task.deferAction { | |
implicit scheduler => | |
Task.eval(obs.publish) | |
} | |
shared = con | |
.doOnSubscribe(() => println("SHARED Subscribed")) | |
.doOnTerminate(x => println(s"SHARED term: $x")) | |
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled")) | |
a <- shared.foreachL(i => println(s"A: $i")).fork | |
_ <- Task.eval(println("A running")) | |
b <- shared.foreachL(i => println(s"B: $i")).fork | |
_ <- Task.eval(println("connecting")) | |
_ <- Task.eval(con.connect()) | |
_ <- a.join | |
_ <- b.join | |
_ <- Task.eval(println("Joined")) | |
_ <- Task.eval(println("Fork test done\n*****************\n")) | |
} yield () | |
/* | |
Do the publish and connect all in the same side-effect block. | |
This works reliably because `.foreach` and similar evaluation side-effects perform the subscription | |
synchronously (provided we haven't put any subscription delays on our `Observable`). | |
SHARED Subscribed | |
A running | |
SHARED Subscribed | |
connecting | |
SOURCE Subscribed | |
A: 0 | |
B: 0 | |
A: 1 | |
B: 1 | |
A: 2 | |
B: 2 | |
A: 3 | |
B: 3 | |
A: 4 | |
B: 4 | |
SOURCE term: None | |
SHARED term: None | |
SHARED term: None | |
Joined | |
Side effect test done | |
***************** | |
*/ | |
val sideEffectTest = for { | |
_ <- Task.eval("*** Side Effect Test") | |
_ <- Task.deferFutureAction { | |
implicit scheduler => | |
val con = obs.publish | |
val shared = con | |
.doOnSubscribe(() => println("SHARED Subscribed")) | |
.doOnTerminate(x => println(s"SHARED term: $x")) | |
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled")) | |
val a = shared.foreach(i => println(s"A: $i")) | |
println("A running") | |
val b = shared.foreach(i => println(s"B: $i")) | |
println("connecting") | |
con.connect() | |
a.zip(b) | |
} | |
_ <- Task.eval(println("Joined")) | |
_ <- Task.eval(println("Side effect test done\n*****************\n")) | |
} yield () | |
startTest *> forkTest *> sideEffectTest | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment