Created
February 28, 2019 14:26
-
-
Save yasuabe/c4a6a6ca81cd25f8542799c85f5d462b to your computer and use it in GitHub Desktop.
publish/subscribe with monix observable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package observable | |
import cats.effect.ExitCode | |
import monix.eval.{Task, TaskApp} | |
import monix.execution.Ack | |
import monix.execution.Ack.{Continue, Stop} | |
import monix.reactive.{Consumer, Observable, Observer} | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
object ObservableExercise { | |
def printTick(name: String) = (n: Long) => Task.sleep(100.milli).map { _ => | |
println(s"$name: $n in ${ Thread.currentThread().getId }") | |
} | |
} | |
import observable.ObservableExercise._ | |
trait MixinObservable { | |
val observable = Observable.interval(1.second).takeWhile(_ <= 3) | |
} | |
// using Task | |
object Ex1 extends TaskApp with MixinObservable { | |
def gather(tasks: (Long => Task[Unit])*) = observable.mapEval { n => | |
Task.gather(tasks.map(_(n))) | |
} | |
def run(args: List[String]): Task[ExitCode] = { | |
import monix.execution.Scheduler.Implicits.global | |
val taskA = printTick("A") | |
val taskB = printTick("B") | |
Task.parZip2( | |
Task.sleep(5.second), | |
Task(gather(taskA, taskB).subscribe()) | |
).map(_ => ExitCode.Success) | |
} | |
} | |
// using Consumer | |
object Ex2 extends TaskApp with MixinObservable { | |
def consumer(name: String): Consumer[Long, Unit] = | |
Consumer.foreachEval(printTick(name)) | |
def run(args: List[String]): Task[ExitCode] = { | |
import monix.execution.Scheduler.Implicits.global | |
val connectable = observable.publish | |
val countA = connectable.consumeWith[Unit](consumer("A")) | |
val countB = connectable.consumeWith[Unit](consumer("B")) | |
Task.parZip4( | |
Task.sleep(7.second), | |
countA, | |
countB, | |
Task(connectable.connect()) | |
).map(_ => ExitCode.Success) | |
} | |
} | |
case class MyObserver(name: String, until: Long = Long.MaxValue) extends Observer[Long] { | |
import monix.execution.Scheduler.Implicits.global | |
def onNext(elem: Long): Future[Ack] = | |
printTick(name)(elem).runToFuture.flatMap { _ => | |
if (elem < until) Continue else Stop | |
} | |
def onError(ex: Throwable): Unit = | |
println(s"$name: an error occurred ${ ex.getMessage }") | |
def onComplete(): Unit = | |
println(s"$name: completed") | |
} | |
// using Observer | |
object Ex3 extends TaskApp with MixinObservable { | |
def run(args: List[String]): Task[ExitCode] = { | |
import monix.execution.Scheduler.Implicits.global | |
val connectable = observable.publish | |
connectable.subscribe(MyObserver("A", 1)) | |
connectable.subscribe(MyObserver("B")) | |
Task.parZip2( | |
Task.sleep(10.second), | |
Task(connectable.connect()) | |
).map(_ => ExitCode.Success) | |
} | |
} | |
// using Observer and canceling | |
object Ex4 extends TaskApp with MixinObservable { | |
def run(args: List[String]): Task[ExitCode] = { | |
import monix.execution.Scheduler.Implicits.global | |
val connectable = observable.publish | |
connectable.subscribe(MyObserver("A", 1)) | |
connectable.subscribe(MyObserver("B")) | |
val mainT = Task.sleep(10.second) | |
val cancelT = for { | |
cancelable <- Task(connectable.connect()) | |
_ <- Task.sleep(3.second) | |
} yield cancelable.cancel | |
Task.parZip2(mainT, cancelT).map(_ => ExitCode.Success) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment