Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Created February 28, 2019 14:26
Show Gist options
  • Save yasuabe/c4a6a6ca81cd25f8542799c85f5d462b to your computer and use it in GitHub Desktop.
Save yasuabe/c4a6a6ca81cd25f8542799c85f5d462b to your computer and use it in GitHub Desktop.
publish/subscribe with monix observable
package observable
import cats.effect.ExitCode
import monix.eval.{Task, TaskApp}
import monix.execution.Ack
import monix.execution.Ack.{Continue, Stop}
import monix.reactive.{Consumer, Observable, Observer}
import scala.concurrent.Future
import scala.concurrent.duration._
object ObservableExercise {
def printTick(name: String) = (n: Long) => Task.sleep(100.milli).map { _ =>
println(s"$name: $n in ${ Thread.currentThread().getId }")
}
}
import observable.ObservableExercise._
trait MixinObservable {
val observable = Observable.interval(1.second).takeWhile(_ <= 3)
}
// using Task
object Ex1 extends TaskApp with MixinObservable {
def gather(tasks: (Long => Task[Unit])*) = observable.mapEval { n =>
Task.gather(tasks.map(_(n)))
}
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val taskA = printTick("A")
val taskB = printTick("B")
Task.parZip2(
Task.sleep(5.second),
Task(gather(taskA, taskB).subscribe())
).map(_ => ExitCode.Success)
}
}
// using Consumer
object Ex2 extends TaskApp with MixinObservable {
def consumer(name: String): Consumer[Long, Unit] =
Consumer.foreachEval(printTick(name))
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish
val countA = connectable.consumeWith[Unit](consumer("A"))
val countB = connectable.consumeWith[Unit](consumer("B"))
Task.parZip4(
Task.sleep(7.second),
countA,
countB,
Task(connectable.connect())
).map(_ => ExitCode.Success)
}
}
case class MyObserver(name: String, until: Long = Long.MaxValue) extends Observer[Long] {
import monix.execution.Scheduler.Implicits.global
def onNext(elem: Long): Future[Ack] =
printTick(name)(elem).runToFuture.flatMap { _ =>
if (elem < until) Continue else Stop
}
def onError(ex: Throwable): Unit =
println(s"$name: an error occurred ${ ex.getMessage }")
def onComplete(): Unit =
println(s"$name: completed")
}
// using Observer
object Ex3 extends TaskApp with MixinObservable {
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish
connectable.subscribe(MyObserver("A", 1))
connectable.subscribe(MyObserver("B"))
Task.parZip2(
Task.sleep(10.second),
Task(connectable.connect())
).map(_ => ExitCode.Success)
}
}
// using Observer and canceling
object Ex4 extends TaskApp with MixinObservable {
def run(args: List[String]): Task[ExitCode] = {
import monix.execution.Scheduler.Implicits.global
val connectable = observable.publish
connectable.subscribe(MyObserver("A", 1))
connectable.subscribe(MyObserver("B"))
val mainT = Task.sleep(10.second)
val cancelT = for {
cancelable <- Task(connectable.connect())
_ <- Task.sleep(3.second)
} yield cancelable.cancel
Task.parZip2(mainT, cancelT).map(_ => ExitCode.Success)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment