Created
February 25, 2020 22:12
-
-
Save filosganga/738210cf0ddb4d30724184ed9f15ca7b to your computer and use it in GitHub Desktop.
Several effectful Queue implementation using cats-effect
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.data._ | |
import cats.implicits._ | |
import cats.effect._ | |
import cats.effect.concurrent._ | |
trait Queue[F[_], A] { | |
def enqueue(a: A): F[Unit] | |
def dequeue: F[A] | |
} | |
object Queue { | |
def one[F[_]: Concurrent, A]: F[Queue[F, A]] = { | |
Ref.of[F, Option[Deferred[F, A]]](none[Deferred[F, A]]).map { ref => | |
new Queue[F, A] { | |
val createOrExistingDefer = Deferred[F,A].flatMap { promise => | |
ref.modify { | |
case Some(p) => (None, p) | |
case None => (Some(promise), promise) | |
} | |
} | |
def enqueue(a: A): F[Unit] = { | |
createOrExistingDefer.flatMap(_.complete(a)) | |
} | |
val dequeue: F[A] = { | |
createOrExistingDefer.flatMap(_.get) | |
} | |
} | |
} | |
} | |
def unbounded[F[_]: Concurrent, A]: F[Queue[F, A]] = { | |
case class ReadsAndWrites( | |
reads: Chain[Deferred[F, A]], | |
items: Chain[A] | |
) | |
object ReadsAndWrites { | |
val empty = ReadsAndWrites(Chain.empty, Chain.empty) | |
} | |
Ref.of[F, ReadsAndWrites](ReadsAndWrites.empty).map { ref => | |
new Queue[F, A] { | |
def enqueue(a: A): F[Unit] = { | |
ref.modify { rw => | |
rw.reads.uncons match { | |
case Some((p, rest)) => rw.copy(reads = rest) -> p.complete(a) | |
case None => rw.copy(items = rw.items.append(a)) -> ().pure[F] | |
} | |
}.flatten | |
} | |
val dequeue: F[A] = { | |
Deferred[F, A].flatMap { promise => | |
ref.modify { rw => | |
rw.items.uncons match { | |
case Some((item, rest)) => (rw.copy(items = rest) -> item.pure[F]) | |
case None => (rw.copy(reads = rw.reads.append(promise)) -> promise.get) | |
} | |
}.flatten | |
} | |
} | |
} | |
} | |
} | |
def bounded[F[_]: Concurrent, A](bound: Int): F[Queue[F, A]] = { | |
sealed trait Status | |
case object Idle extends Status | |
case class Empty(pendingReads: NonEmptyChain[Deferred[F, A]]) extends Status | |
case class NonEmpty(values: NonEmptyChain[A]) extends Status | |
Ref.of[F, Status](Idle).flatMap { status => | |
Semaphore[F](bound).map { semaphore => | |
new Queue[F, A] { | |
def enqueue(a: A): F[Unit] = { | |
semaphore.withPermit { | |
status.modify { | |
case Idle => NonEmpty(NonEmptyChain.one(a))->().pure[F] | |
case NonEmpty(values) => NonEmpty(values :+ a)->().pure[F] | |
case Empty(pendingReads) => { | |
val nextState = pendingReads.tail.uncons match { | |
case Some((a, chain)) => Empty(NonEmptyChain.one(a).prependChain(chain)) | |
case None => Idle | |
} | |
nextState -> pendingReads.head.complete(a) | |
} | |
}.flatten | |
} | |
} | |
val dequeue: F[A] = { | |
Deferred[F, A].flatMap { promise => | |
status.modify { | |
case Idle => Empty(NonEmptyChain.one(promise))->promise.get | |
case Empty(pendingReads) => Empty(pendingReads :+ promise)->promise.get | |
case NonEmpty(values) => { | |
val nextStatus = values.tail.uncons match { | |
case Some((a, chain)) => NonEmpty(NonEmptyChain.one(a).prependChain(chain)) | |
case None => Idle | |
} | |
nextStatus -> values.head.pure[F] | |
} | |
}.flatten | |
} | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment