Skip to content

Instantly share code, notes, and snippets.

@samspills
Last active July 29, 2024 19:15
Show Gist options
  • Save samspills/b0005f661b139f89817710f46b1794a8 to your computer and use it in GitHub Desktop.
Save samspills/b0005f661b139f89817710f46b1794a8 to your computer and use it in GitHub Desktop.
a simple semaphore that can support FIFO, LIFO, and maybe eventually priority
//> using scala "2.13.14"
//> using dep "org.typelevel::cats-effect:3.5.4"
//> using dep "org.typelevel::cats-core:2.12.0"
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import cats.effect.kernel.syntax.all._
import cats.effect.kernel.GenConcurrent
import cats.effect.kernel.Deferred
import cats.effect.kernel.Resource
import scala.collection.immutable.{Queue => ScalaQueue}
import cats.effect.kernel.Poll
import scala.reflect.ClassTag
import cats.effect.kernel.Ref
import cats.kernel.Eq
trait Fairness
case object Lifo extends Fairness
case object Fifo extends Fairness
case object Priority extends Fairness
abstract class Blerfaphore[F[_]] {
def acquire: F[Unit]
def release: F[Unit]
def permit: Resource[F, Unit]
def withPermit[A](fa: F[A]): F[A]
}
object Blerfaphore {
trait BlueueTC[F[_], A] {
def cleanup(fa: F[A], elem: A): F[A]
def offer(fa: F[A], elem: A): F[A]
def take(fa: F[A]): (F[A], A)
def nonEmpty(fa: F[A]): Boolean
}
implicit def listBlueue[A <: AnyRef]: BlueueTC[List,A]= new BlueueTC[List, A] {
def cleanup(fa: List[A], elem: A) = fa.filterNot(_ eq elem)
def offer(fa: List[A], elem: A) = elem :: fa
def take(fa: List[A]) = (fa.tail, fa.head)
def nonEmpty(fa: List[A]) = fa.nonEmpty
}
implicit def queueBlueue[A <: AnyRef]: BlueueTC[ScalaQueue,A]= new BlueueTC[ScalaQueue, A] {
def cleanup(fa: ScalaQueue[A], elem: A) = fa.filterNot(_ eq elem)
def offer(fa: ScalaQueue[A], elem: A) = fa :+ elem
def take(fa: ScalaQueue[A]) = (fa.tail, fa.head)
def nonEmpty(fa: ScalaQueue[A]) = fa.nonEmpty
}
private case class StateTC[F[_], A](waiting: F[A], permits: Long)(implicit btc: BlueueTC[F, A])
def apply[F[_]](fairness: Fairness, n: Long)(implicit F: GenConcurrent[F, _]): F[Blerfaphore[F]] = {
require(n >= 0, s"n must be nonnegative, was: $n")
fairness match {
case Fifo => F.ref(StateTC(ScalaQueue[Deferred[F, Unit]](), n)).map(blerfaphore(_))
case Lifo => F.ref(StateTC(List.empty[Deferred[F, Unit]], n)).map(blerfaphore(_))
case Priority => ???
}
}
private def blerfaphore[F[_], G[_]](state: Ref[F, StateTC[G, Deferred[F, Unit]]])(implicit F: GenConcurrent[F, _], B: BlueueTC[G, Deferred[F, Unit]]): Blerfaphore[F] = {
new Blerfaphore[F] {
def acquire: F[Unit] =
F.uncancelable { poll =>
F.deferred[Unit].flatMap { wait =>
val cleanup = state.update {
case s @ StateTC(waiting, permits) =>
if (B.nonEmpty(waiting))
StateTC(B.cleanup(waiting, wait), permits)
else s
}
state.modify {
case StateTC(waiting, permits) =>
if (permits == 0) {
StateTC(B.offer(waiting, wait), permits) -> poll(wait.get).onCancel(cleanup) }
else
StateTC(waiting, permits - 1) -> ().pure[F]
}.flatten
}
}
def release: F[Unit] =
state.flatModify {
case StateTC(waiting, permits) =>
if (B.nonEmpty(waiting)) {
val (rest, next) = B.take(waiting)
StateTC(rest, permits) -> next.complete(()).void }
else
StateTC(waiting, permits + 1) -> ().pure[F]
}
def permit: Resource[F, Unit] = Resource.makeFull ( (poll: Poll[F]) => poll(acquire) ) { _ => release }
def withPermit[A](fa: F[A]): F[A] = F.uncancelable { poll => poll(acquire) >> poll(fa).guarantee(release) }
}
}
}
object BlerfaphoreExperiment extends IOApp.Simple {
import scala.concurrent.duration._
def action(blerf: Blerfaphore[IO], id: Int): IO[Unit] =
blerf.withPermit( IO.println(s"action $id running") *> IO.sleep(1.seconds))
val run: IO[Unit] = for {
blerf <- Blerfaphore[IO](Fifo, 1)
_ <- action(blerf, 1).start <* IO.sleep(10.milli)
_ <- action(blerf, 2).start <* IO.sleep(10.milli)
_ <- action(blerf, 3).start <* IO.sleep(10.milli)
_ <- action(blerf, 4).start <* IO.sleep(10.milli)
_ <- action(blerf, 5).start <* IO.sleep(10.milli)
_ <- action(blerf, 6).start <* IO.sleep(10.milli)
_ <- action(blerf, 7).start <* IO.sleep(10.milli)
_ <- action(blerf, 8).start <* IO.sleep(10.milli)
_ <- IO.sleep(10.seconds)
_ <- IO.println("done I guess?")
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment