Skip to content

Instantly share code, notes, and snippets.

Last active February 24, 2021 01:16
Show Gist options
  • Save arnolddevos/67cd0144efae36b6c60667eaa664b458 to your computer and use it in GitHub Desktop.
Save arnolddevos/67cd0144efae36b6c60667eaa664b458 to your computer and use it in GitHub Desktop.
Easy concurrency primitives
// take() blocks until next* offer()
// (similar but not the conventional meaning of barrier)
def barrier() = new Gate[Unit, Long] {
private val state = new Transactor(0l)
val take =
for {
v0 <- state.transact(v => Observed(succeed(v)))
v1 <- state.transact { v =>
if(v > v0) Observed(succeed(v))
else Blocked
yield v1
def offer(u: Unit) =
state.transact { v =>
Updated(v + 1, unit)
trait Gate[-A, +B] {
def offer(s: A): IO[Nothing, Unit]
def take: IO[Nothing, B]
// captures first value (T) passed to offer(). take() blocks until value available.
// (not to be confused with countdown latch)
def latch[T]() = new Gate[T, T] {
private val state = new Transactor(None: Option[T])
val take =
state.transact {
_ match {
case Some(a) => Observed(succeed(a))
case None => Blocked
def offer(t: T) =
state.transact {
_ match {
case Some(_) => Observed(unit)
case None => Updated(Some(t), unit)
// queue with fixed capacity and back pressure
def queue[T](quota: Int) = new Gate[T, T] {
private val state = new Transactor(Queue.empty[T])
val take =
state.transact { q =>
if( ! q.isEmpty ) Updated( q.tail, succeed(q.head))
else Blocked
def offer(t: T) =
state.transact { q =>
if( q.length < quota ) Updated(q.enqueue(t), unit)
else Blocked
// conventional semaphore
def semaphore(v0: Long) = new Gate[Long, Long] {
private val state = new Transactor(v0)
// P or wait
val take =
state.transact { v =>
if( v > 0 ) Updated(v-1, succeed(v))
else Blocked
// V or signal
def offer(i: Long) =
state.transact { v =>
Updated(v+i, unit)
// A transaction is modeled as a pure function on state which may return a new state
// and a result effect. Or it may return the value Blocked.
// Blocked transactions are retained in the transactor until they can produce an effect.
// The transactor provides `transact[E, A](tx: Transaction[State, IO[E, A]]): IO[E, A]`.
// This effect embodies the state change and result effect.
type Transaction[S, +T] = S => Status[S, T]
enum Status[+S, +T] {
case Updated(state: S, effect: T)
case Observed(effect: T)
case Blocked
final class Transactor[S](init: S) {
def transact[E, A](tx: Transaction[S, IO[E, A]]): IO[E, A] = ???
// see
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment