Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active July 7, 2018 13:14
Show Gist options
  • Save adamw/c13cc63d4d617f7b3f4db81f2414ae31 to your computer and use it in GitHub Desktop.
Save adamw/c13cc63d4d617f7b3f4db81f2414ae31 to your computer and use it in GitHub Desktop.
def consume(connector: QueueConnector[IO[Throwable, ?]],
inbox: IOQueue[BroadcastMessage]): IO[Throwable, Unit] = {
val connect: IO[Throwable, Queue[IO[Throwable, ?]]] = IO
.syncThrowable(logger.info("[queue-start] connecting"))
.flatMap(_ => connector.connect)
.map { q =>
logger.info("[queue-start] connected")
q
}
def consumeQueue(queue: Queue[IO[Throwable, ?]]): IO[Throwable, Unit] =
IO.syncThrowable(logger.info("[queue] receiving message"))
.flatMap(_ => queue.read())
.flatMap(msg => inbox.offer(Received(msg)))
.forever
def releaseQueue(queue: Queue[IO[Throwable, ?]]): IO[Void, Unit] =
IO.syncThrowable(logger.info("[queue-stop] closing"))
.flatMap(_ => queue.close())
.map(_ => logger.info("[queue-stop] closed"))
.catchAll[Nothing](e => IO.now(
logger.info("[queue-stop] exception while closing", e)))
connect.bracket(releaseQueue)(consumeQueue)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment