Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active July 7, 2018 13:14
Show Gist options
  • Save adamw/3260c70d6af2326a696061084216e39f to your computer and use it in GitHub Desktop.
Save adamw/3260c70d6af2326a696061084216e39f to your computer and use it in GitHub Desktop.
def broadcast(connector: QueueConnector[IO[Throwable, ?]]
): IO[Void, BroadcastResult] = {
def processMessages(inbox: IOQueue[BroadcastMessage],
consumers: Set[String => IO[Void, Unit]]
): IO[Void, Unit] =
inbox
.take[Nothing]
.flatMap {
case Subscribe(consumer) => processMessages(inbox, consumers + consumer)
case Received(msg) =>
consumers
.map(consumer => consumer(msg).fork[Nothing])
.toList
.sequence_
.flatMap(_ => processMessages(inbox, consumers))
}
def consumeForever(inbox: IOQueue[BroadcastMessage]): IO[Void, Unit] =
consume(connector, inbox).attempt.map {
case Left(e) =>
logger.info("[broadcast] exception in queue consumer, restarting", e)
case Right(()) =>
logger.info("[broadcast] queue consumer completed, restarting")
}.forever
for {
inbox <- IOQueue.make[Void, BroadcastMessage](32)
f1 <- consumeForever(inbox).fork
f2 <- processMessages(inbox, Set()).fork
} yield BroadcastResult(inbox,
f1.interrupt(new RuntimeException) *>
f2.interrupt(new RuntimeException))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment