Last active
February 24, 2021 01:16
-
-
Save arnolddevos/67cd0144efae36b6c60667eaa664b458 to your computer and use it in GitHub Desktop.
Easy concurrency primitives
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// take() blocks until next* offer() | |
// (similar but not the conventional meaning of barrier) | |
def barrier() = new Gate[Unit, Long] { | |
private val state = new Transactor(0l) | |
val take = | |
for { | |
v0 <- state.transact(v => Observed(succeed(v))) | |
v1 <- state.transact { v => | |
if(v > v0) Observed(succeed(v)) | |
else Blocked | |
} | |
} | |
yield v1 | |
def offer(u: Unit) = | |
state.transact { v => | |
Updated(v + 1, unit) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Gate[-A, +B] { | |
def offer(s: A): IO[Nothing, Unit] | |
def take: IO[Nothing, B] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// captures first value (T) passed to offer(). take() blocks until value available. | |
// (not to be confused with countdown latch) | |
def latch[T]() = new Gate[T, T] { | |
private val state = new Transactor(None: Option[T]) | |
val take = | |
state.transact { | |
_ match { | |
case Some(a) => Observed(succeed(a)) | |
case None => Blocked | |
} | |
} | |
def offer(t: T) = | |
state.transact { | |
_ match { | |
case Some(_) => Observed(unit) | |
case None => Updated(Some(t), unit) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// queue with fixed capacity and back pressure | |
def queue[T](quota: Int) = new Gate[T, T] { | |
private val state = new Transactor(Queue.empty[T]) | |
val take = | |
state.transact { q => | |
if( ! q.isEmpty ) Updated( q.tail, succeed(q.head)) | |
else Blocked | |
} | |
def offer(t: T) = | |
state.transact { q => | |
if( q.length < quota ) Updated(q.enqueue(t), unit) | |
else Blocked | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// conventional semaphore | |
def semaphore(v0: Long) = new Gate[Long, Long] { | |
private val state = new Transactor(v0) | |
// P or wait | |
val take = | |
state.transact { v => | |
if( v > 0 ) Updated(v-1, succeed(v)) | |
else Blocked | |
} | |
// V or signal | |
def offer(i: Long) = | |
state.transact { v => | |
Updated(v+i, unit) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// A transaction is modeled as a pure function on state which may return a new state | |
// and a result effect. Or it may return the value Blocked. | |
// Blocked transactions are retained in the transactor until they can produce an effect. | |
// The transactor provides `transact[E, A](tx: Transaction[State, IO[E, A]]): IO[E, A]`. | |
// This effect embodies the state change and result effect. | |
type Transaction[S, +T] = S => Status[S, T] | |
enum Status[+S, +T] { | |
case Updated(state: S, effect: T) | |
case Observed(effect: T) | |
case Blocked | |
} | |
final class Transactor[S](init: S) { | |
def transact[E, A](tx: Transaction[S, IO[E, A]]): IO[E, A] = ??? | |
// see https://github.com/arnolddevos/epsilonio/blob/fb3875b8ef52e73e9a5d10cbe9b5f43624f431d4/src/main/scala/minio/Synchronization.scala#L17 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment