Skip to content

Instantly share code, notes, and snippets.

View adamw's full-sized avatar

Adam Warski adamw

View GitHub Profile
def broadcast(connector: QueueConnector[IO[Throwable, ?]]
): IO[Void, BroadcastResult] = {
def processMessages(inbox: IOQueue[BroadcastMessage],
consumers: Set[String => IO[Void, Unit]]
): IO[Void, Unit] =
inbox
.take[Nothing]
.flatMap {
case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
def consume(connector: QueueConnector[IO[Throwable, ?]],
inbox: IOQueue[BroadcastMessage]): IO[Throwable, Unit] = {
val connect: IO[Throwable, Queue[IO[Throwable, ?]]] = IO
.syncThrowable(logger.info("[queue-start] connecting"))
.flatMap(_ => connector.connect)
.map { q =>
logger.info("[queue-start] connected")
q
}
sealed trait BroadcastMessage
case class Subscribe(consumer: String => IO[Nothing, Unit]) extends BroadcastMessage
case class Received(msg: String) extends BroadcastMessage
def broadcast(connector: QueueConnector[Task]): Task[BroadcastResult] = {
def processMessages(inbox: MVar[BroadcastMessage],
consumers: Set[String => Task[Unit]]): Task[Unit] = // ...
def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] = // ...
for {
inbox <- MVar.empty[BroadcastMessage]
f1 <- consumeForever(inbox).fork
f2 <- processMessages(inbox, Set()).fork
def consumeForever(inbox: MVar[BroadcastMessage]): Task[Unit] =
consume(connector, inbox).attempt
.map {
case Left(e) =>
logger.info("[broadcast] exception in queue consumer, restarting", e)
case Right(()) =>
logger.info("[broadcast] queue consumer completed, restarting")
}
.restartUntil(_ => false)
def processMessages(inbox: MVar[BroadcastMessage],
consumers: Set[String => Task[Unit]]): Task[Unit] =
inbox.take
.flatMap {
case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
case Received(msg) =>
consumers
.map(consumer => consumer(msg).fork)
.toList
.sequence_
def consume(connector: QueueConnector[Task],
inbox: MVar[BroadcastMessage]): Task[Unit] = {
val connect: Task[Queue[Task]] = // ...
def consumeQueue(queue: Queue[Task]): Task[Unit] = // ...
def releaseQueue(queue: Queue[Task]): Task[Unit] = // ...
connect.bracket(consumeQueue)(releaseQueue)
}
val connect: Task[Queue[Task]] = Task
.eval(logger.info("[queue-start] connecting"))
.flatMap(_ => connector.connect)
.map { q =>
logger.info("[queue-start] connected")
q
}
def consumeQueue(queue: Queue[Task]): Task[Unit] =
Task
sealed trait BroadcastMessage
case class Subscribe(consumer: String => Task[Unit]) extends BroadcastMessage
case class Received(msg: String) extends BroadcastMessage
def consumeQueueBehavior(queue: Queue[Future],
msgSink: ActorRef[Received]): Behavior[Try[String]] =
Behaviors.setup { ctx =>
import ctx.executionContext
ctx.log.info("[queue] receiving message")
queue.read().andThen { case result => ctx.self ! result }
Behaviors
.receiveMessage[Try[String]] {