Skip to content

Instantly share code, notes, and snippets.

@ghostdogpr
Last active July 29, 2019 23:52
Show Gist options
  • Save ghostdogpr/aefca7373fc53c274c9e620c7d1824d6 to your computer and use it in GitHub Desktop.
Save ghostdogpr/aefca7373fc53c274c9e620c7d1824d6 to your computer and use it in GitHub Desktop.
def listen(actorSystem: ActorSystem, topic: String): Task[Queue[String]] =
for {
queue <- Queue.bounded[String](1000)
rts <- Task.runtime[Any]
_ <- Task(actorSystem.actorOf(Props(new SubscriberActor(topic, rts, queue))))
} yield queue
case class MessageEnvelope(msg: String)
class SubscriberActor(topic: String, rts: Runtime[Any], queue: Queue[String]) extends Actor {
DistributedPubSub(actorSystem).mediator ! Subscribe(topic, self)
def receive: PartialFunction[Any, Unit] = {
case MessageEnvelope(s) => rts.unsafeRunSync(queue.offer(s))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment