Last active
          July 29, 2024 19:15 
        
      - 
      
 - 
        
Save samspills/b0005f661b139f89817710f46b1794a8 to your computer and use it in GitHub Desktop.  
    a simple semaphore that can support FIFO, LIFO, and maybe eventually priority
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | //> using scala "2.13.14" | |
| //> using dep "org.typelevel::cats-effect:3.5.4" | |
| //> using dep "org.typelevel::cats-core:2.12.0" | |
| import cats.effect.{IO, IOApp} | |
| import cats.syntax.all._ | |
| import cats.effect.kernel.syntax.all._ | |
| import cats.effect.kernel.GenConcurrent | |
| import cats.effect.kernel.Deferred | |
| import cats.effect.kernel.Resource | |
| import scala.collection.immutable.{Queue => ScalaQueue} | |
| import cats.effect.kernel.Poll | |
| import scala.reflect.ClassTag | |
| import cats.effect.kernel.Ref | |
| import cats.kernel.Eq | |
| trait Fairness | |
| case object Lifo extends Fairness | |
| case object Fifo extends Fairness | |
| case object Priority extends Fairness | |
| abstract class Blerfaphore[F[_]] { | |
| def acquire: F[Unit] | |
| def release: F[Unit] | |
| def permit: Resource[F, Unit] | |
| def withPermit[A](fa: F[A]): F[A] | |
| } | |
| object Blerfaphore { | |
| trait BlueueTC[F[_], A] { | |
| def cleanup(fa: F[A], elem: A): F[A] | |
| def offer(fa: F[A], elem: A): F[A] | |
| def take(fa: F[A]): (F[A], A) | |
| def nonEmpty(fa: F[A]): Boolean | |
| } | |
| implicit def listBlueue[A <: AnyRef]: BlueueTC[List,A]= new BlueueTC[List, A] { | |
| def cleanup(fa: List[A], elem: A) = fa.filterNot(_ eq elem) | |
| def offer(fa: List[A], elem: A) = elem :: fa | |
| def take(fa: List[A]) = (fa.tail, fa.head) | |
| def nonEmpty(fa: List[A]) = fa.nonEmpty | |
| } | |
| implicit def queueBlueue[A <: AnyRef]: BlueueTC[ScalaQueue,A]= new BlueueTC[ScalaQueue, A] { | |
| def cleanup(fa: ScalaQueue[A], elem: A) = fa.filterNot(_ eq elem) | |
| def offer(fa: ScalaQueue[A], elem: A) = fa :+ elem | |
| def take(fa: ScalaQueue[A]) = (fa.tail, fa.head) | |
| def nonEmpty(fa: ScalaQueue[A]) = fa.nonEmpty | |
| } | |
| private case class StateTC[F[_], A](waiting: F[A], permits: Long)(implicit btc: BlueueTC[F, A]) | |
| def apply[F[_]](fairness: Fairness, n: Long)(implicit F: GenConcurrent[F, _]): F[Blerfaphore[F]] = { | |
| require(n >= 0, s"n must be nonnegative, was: $n") | |
| fairness match { | |
| case Fifo => F.ref(StateTC(ScalaQueue[Deferred[F, Unit]](), n)).map(blerfaphore(_)) | |
| case Lifo => F.ref(StateTC(List.empty[Deferred[F, Unit]], n)).map(blerfaphore(_)) | |
| case Priority => ??? | |
| } | |
| } | |
| private def blerfaphore[F[_], G[_]](state: Ref[F, StateTC[G, Deferred[F, Unit]]])(implicit F: GenConcurrent[F, _], B: BlueueTC[G, Deferred[F, Unit]]): Blerfaphore[F] = { | |
| new Blerfaphore[F] { | |
| def acquire: F[Unit] = | |
| F.uncancelable { poll => | |
| F.deferred[Unit].flatMap { wait => | |
| val cleanup = state.update { | |
| case s @ StateTC(waiting, permits) => | |
| if (B.nonEmpty(waiting)) | |
| StateTC(B.cleanup(waiting, wait), permits) | |
| else s | |
| } | |
| state.modify { | |
| case StateTC(waiting, permits) => | |
| if (permits == 0) { | |
| StateTC(B.offer(waiting, wait), permits) -> poll(wait.get).onCancel(cleanup) } | |
| else | |
| StateTC(waiting, permits - 1) -> ().pure[F] | |
| }.flatten | |
| } | |
| } | |
| def release: F[Unit] = | |
| state.flatModify { | |
| case StateTC(waiting, permits) => | |
| if (B.nonEmpty(waiting)) { | |
| val (rest, next) = B.take(waiting) | |
| StateTC(rest, permits) -> next.complete(()).void } | |
| else | |
| StateTC(waiting, permits + 1) -> ().pure[F] | |
| } | |
| def permit: Resource[F, Unit] = Resource.makeFull ( (poll: Poll[F]) => poll(acquire) ) { _ => release } | |
| def withPermit[A](fa: F[A]): F[A] = F.uncancelable { poll => poll(acquire) >> poll(fa).guarantee(release) } | |
| } | |
| } | |
| } | |
| object BlerfaphoreExperiment extends IOApp.Simple { | |
| import scala.concurrent.duration._ | |
| def action(blerf: Blerfaphore[IO], id: Int): IO[Unit] = | |
| blerf.withPermit( IO.println(s"action $id running") *> IO.sleep(1.seconds)) | |
| val run: IO[Unit] = for { | |
| blerf <- Blerfaphore[IO](Fifo, 1) | |
| _ <- action(blerf, 1).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 2).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 3).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 4).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 5).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 6).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 7).start <* IO.sleep(10.milli) | |
| _ <- action(blerf, 8).start <* IO.sleep(10.milli) | |
| _ <- IO.sleep(10.seconds) | |
| _ <- IO.println("done I guess?") | |
| } yield () | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment