Skip to content

Instantly share code, notes, and snippets.

@quelgar
Last active June 25, 2018 01:55
Show Gist options
  • Save quelgar/973642a981d9e24b682ac9c8f104e5a7 to your computer and use it in GitHub Desktop.
Save quelgar/973642a981d9e24b682ac9c8f104e5a7 to your computer and use it in GitHub Desktop.
Testing reliably subscribing to a Monix ConnectableObservable before connecting.
package au.com.str.stellar
import cats.implicits._
import monix.eval.{Task, TaskApp}
import monix.reactive.Observable
import scala.concurrent.duration._
object Test extends TaskApp {
override def runc: Task[Unit] = {
val obs = Observable.range(0L, 5L)
.doOnSubscribe(() => println("SOURCE Subscribed"))
.doOnTerminate(x => println(s"SOURCE term: $x"))
.doOnSubscriptionCancel(() => println("SOURCE Subscription canceled"))
/*
This does not work, there is no output from A and B:
A running
connecting
SOURCE Subscribed
SOURCE term: None
SHARED Subscribed
SHARED term: None
SHARED Subscribed
SHARED term: None
Joined
Start test done
*****************
When `.start` returns, the subscription to the shared observable has not yet happened, so when we connect
the values are emitted without any downstream subscribers.
*/
val startTest = for {
_ <- Task.eval("*** Start Test")
con <- Task.deferAction {
implicit scheduler =>
Task.eval(obs.publish)
}
shared = con
.doOnSubscribe(() => println("SHARED Subscribed"))
.doOnTerminate(x => println(s"SHARED term: $x"))
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled"))
a <- shared.foreachL(i => println(s"A: $i")).start
_ <- Task.eval(println("A running"))
b <- shared.foreachL(i => println(s"B: $i")).start
_ <- Task.eval(println("connecting"))
_ <- Task.eval(con.connect())
_ <- a.join
_ <- b.join
_ <- Task.eval(println("Joined"))
_ <- Task.eval(println("Start test done\n*****************\n"))
} yield ()
/*
Change `.start` to `.fork`.
This seems to work, but it may be a race condition. There doesn't seem to be any guarantee that the subscription
has occurred by the time `.fork` has returned. That is, it seems the subscription happens asynchronously.
SHARED Subscribed
A running
SHARED Subscribed
connecting
SOURCE Subscribed
A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
A: 4
B: 4
SOURCE term: None
SHARED term: None
SHARED term: None
Joined
Fork test done
*****************
*/
val forkTest = for {
_ <- Task.eval("*** Fork Effect Test")
con <- Task.deferAction {
implicit scheduler =>
Task.eval(obs.publish)
}
shared = con
.doOnSubscribe(() => println("SHARED Subscribed"))
.doOnTerminate(x => println(s"SHARED term: $x"))
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled"))
a <- shared.foreachL(i => println(s"A: $i")).fork
_ <- Task.eval(println("A running"))
b <- shared.foreachL(i => println(s"B: $i")).fork
_ <- Task.eval(println("connecting"))
_ <- Task.eval(con.connect())
_ <- a.join
_ <- b.join
_ <- Task.eval(println("Joined"))
_ <- Task.eval(println("Fork test done\n*****************\n"))
} yield ()
/*
Do the publish and connect all in the same side-effect block.
This works reliably because `.foreach` and similar evaluation side-effects perform the subscription
synchronously (provided we haven't put any subscription delays on our `Observable`).
SHARED Subscribed
A running
SHARED Subscribed
connecting
SOURCE Subscribed
A: 0
B: 0
A: 1
B: 1
A: 2
B: 2
A: 3
B: 3
A: 4
B: 4
SOURCE term: None
SHARED term: None
SHARED term: None
Joined
Side effect test done
*****************
*/
val sideEffectTest = for {
_ <- Task.eval("*** Side Effect Test")
_ <- Task.deferFutureAction {
implicit scheduler =>
val con = obs.publish
val shared = con
.doOnSubscribe(() => println("SHARED Subscribed"))
.doOnTerminate(x => println(s"SHARED term: $x"))
.doOnSubscriptionCancel(() => println("SHARED Subscription canceled"))
val a = shared.foreach(i => println(s"A: $i"))
println("A running")
val b = shared.foreach(i => println(s"B: $i"))
println("connecting")
con.connect()
a.zip(b)
}
_ <- Task.eval(println("Joined"))
_ <- Task.eval(println("Side effect test done\n*****************\n"))
} yield ()
startTest *> forkTest *> sideEffectTest
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment