-
-
Save mvillafuertem/ebb5f1a2aae5990a71cb1226a3eb8d08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.std.{Dispatcher, Queue} | |
import cats.effect.{IO, IOApp} | |
import com.typesafe.scalalogging.Logger | |
import fs2.{Pipe, Stream} | |
import java.util.concurrent.atomic.AtomicInteger | |
import scala.collection.mutable | |
import scala.concurrent.duration.DurationInt | |
object Subscriptions { | |
private object lock | |
private val cnt = new AtomicInteger(0) | |
private val subscriptions = mutable.Map.empty[Int, String => Unit] | |
def subscribe(cb: String => Unit, token: Int): Unit = lock.synchronized { | |
subscriptions.update(token, cb) | |
} | |
def unsubscribe(token: Int): Unit = lock.synchronized { | |
subscriptions.remove(token) | |
} | |
def notifyListeners: Unit = { | |
val listeners = lock.synchronized(subscriptions.values.toList) | |
listeners.foreach(_.apply(s"from callback: ${cnt.incrementAndGet().toString}")) | |
} | |
} | |
object TestDispatcher extends IOApp.Simple { | |
private val log = Logger[this.type] | |
private val requestStream = Stream.iterate(1)(_ + 1).covary[IO].evalTap(_ => IO.sleep(10.millis)) | |
private val tokenFactory = new AtomicInteger(0) | |
def onInput(dispatcher: Dispatcher[IO], q: Queue[IO, String]): Unit = { | |
val token = tokenFactory.incrementAndGet() | |
def cb(s: String): Unit = { | |
dispatcher.unsafeRunSync(q.offer(s).delayBy(1.milli)) | |
Subscriptions.unsubscribe(token) | |
} | |
Subscriptions.subscribe(cb, token) | |
} | |
val pipe: Pipe[IO, Int, String] = { input => | |
Stream.eval(Queue.unbounded[IO, String]).flatMap { output => | |
Stream.resource(Dispatcher[IO].onFinalize(IO(log.info("Releasing Dispatcher")))).flatMap { dispatcher => | |
val processInput = input | |
.map(_ => onInput(dispatcher, output)) | |
.drain | |
Stream.fromQueueUnterminated(output).concurrently(processInput) | |
} | |
} | |
} | |
def run: IO[Unit] = { | |
val consumer = requestStream | |
.through(pipe) | |
.evalTap(x => IO(log.info(s"consumer: $x"))) | |
.timeout(200.millis) | |
.compile | |
.drain | |
val producer = IO.interruptible(true) { | |
while (true) { | |
Thread.sleep(10) | |
Subscriptions.notifyListeners | |
} | |
}.guarantee(IO(log.info("Canceling producer"))) | |
IO.race(consumer, producer).void | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment