Created
January 7, 2022 15:59
-
-
Save ChristopherDavenport/f98b6a412696fc4f5328d7da8c1738f4 to your computer and use it in GitHub Desktop.
RWDeferred
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.effect.{Sync, Async} | |
import cats.effect.kernel.MonadCancel | |
import cats.~> | |
// Like Deferred except we don't try to do anything for multiple readers/writers. | |
// The expectation is that this will ONLY be used to write once | |
// While one fiber will wait for a callback eventually. | |
// Anything else results in failure. | |
final class RWDeferred[F[_]: Async, A] private ( | |
@volatile private[this] var value: A, // A | Null | |
@volatile private[this] var cb: Either[Throwable, A] => Unit // () => Unit | Null | |
){ | |
private[this] class GateCont extends cats.effect.Cont[F, A, A]{ | |
def apply[G[_]](implicit G: MonadCancel[G,Throwable]): (Either[Throwable,A] => Unit, G[A], F ~> G) => G[A] = { | |
case (mcb, gate, _) => | |
cb = mcb | |
val iValue = value | |
if (iValue == null) gate | |
else G.pure(iValue) | |
} | |
} | |
private[this] val gate = new GateCont | |
def complete(a: A): F[Boolean] = Sync[F].delay{ | |
if (value == null){ | |
value = a | |
val iCb = cb | |
if (iCb != null) iCb(Right(a)) | |
true | |
} else false | |
} | |
def get: F[A] = Async[F].cont(gate) | |
def tryGet: F[Option[A]] = Sync[F].delay(Option(value)) | |
} | |
object RWDeferred { | |
def apply[F[_]: Async, A]: F[RWDeferred[F, A]] = Sync[F].delay( | |
new RWDeferred[F, A](null.asInstanceOf[A], null) | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment