Last active
December 10, 2015 16:39
-
-
Save pchiusano/4fe1fffcf66eb0ca8fb7 to your computer and use it in GitHub Desktop.
Asynchronous semaphores in FS2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fs2.async.mutable | |
import fs2.async.AsyncExt | |
/** A nonblocking semaphore, useful as a concurrency primitive. */ | |
trait Semaphore[F[_]] { | |
/** Returns the number of permits currently available for acquisition. Always nonnegative. */ | |
def available: F[Long] | |
/** | |
* Decrement the number of available permits by `n`, blocking until `n` | |
* are available. Error if `n < 0`. | |
*/ | |
def acquire(n: Long): F[Unit] | |
/** Acquire `n` permits now and return `true`, or return `false` immediately. Error if `n < 0`. */ | |
def tryAcquire(n: Long): F[Boolean] | |
/** | |
* Increment the number of available permits by `n`. | |
* This will have the effect of unblocking `n` acquisitions. | |
*/ | |
def release(n: Long): F[Unit] | |
/** Acquire a single permit. Calls `[[acquire(1)]]`. */ | |
final def acquire1: F[Unit] = acquire(1) | |
/** Release a single permit. Calls `[[release(1)]]`. */ | |
final def release1: F[Unit] = release(1) | |
} | |
object Semaphore { | |
/** Create a new `Semaphore`, initialized with `n` available permits. */ | |
def apply[F[_]](n: Long)(implicit F: AsyncExt[F]): F[Semaphore[F]] = { | |
def ensureNonneg(n: Long) = if (n < 0) throw new IllegalArgumentException("n must be nonnegative, was: " + n) | |
ensureNonneg(n) | |
// semaphore is either empty, and there are number of outstanding acquires (Left) | |
// or it is nonempty, and there are n permits available (Right) | |
type S = Either[Vector[(Long,F[Unit])], Long] | |
F.map(F.refOf[S](Right(n))) { ref => new Semaphore[F] { | |
private def open(gate: F.Ref[Unit]) = F.setPure(gate)(()) | |
def acquire(n: Long) = { ensureNonneg(n) | |
if (n == 0) F.pure(()) | |
else F.bind(F.modify(ref) { | |
case Left(waiting) => F.map(F.ref[Unit]) { gate => Left(waiting :+ (n -> open(gate))) } | |
case Right(m) => | |
if (n <= m) F.pure(Right(m-n)) | |
else F.map(F.ref[Unit]) { gate => Left(Vector((n-m) -> open(gate))) } | |
}) { change => change.now match { | |
case Left(waiting) => waiting.lastOption.map(_._2).getOrElse(F.pure(())) | |
case Right(_) => F.pure(()) | |
}} | |
} | |
def release(n: Long) = { ensureNonneg(n) | |
if (n == 0) F.pure(()) | |
else F.map(F.modify(ref) { | |
case Left(waiting) => | |
def go(waiting: Vector[(Long,F[Unit])], n: Long): F[S] = waiting match { | |
case Vector() => F.pure(Right(n)) | |
case v if v.head._1 <= n => F.bind(v.head._2) { _ => go(v.tail, n-v.head._1) } | |
case v => if (n <= 0) F.pure(Left(waiting)) | |
else F.pure(Left((v.head._1 - n -> v.head._2) +: v.tail)) | |
} | |
go(waiting, n) | |
case Right(m) => F.pure(Right(m+n)) | |
}) { change => () } | |
} | |
def tryAcquire(n: Long) = { ensureNonneg(n) | |
if (n == 0) F.pure(true) | |
else F.map(F.modify(ref) { | |
case Right(m) if m >= n => F.pure(Right(m-n)) | |
case w => F.pure(w) | |
})(_.now.isRight) | |
} | |
def available = F.map(F.get(ref)) { | |
case Left(_) => 0 | |
case Right(n) => n | |
} | |
}}} | |
def empty[F[_]:AsyncExt]: F[Semaphore[F]] = apply(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment