Last active
November 17, 2021 11:25
-
-
Save otto-dev/acb7f4262d9e85b2c49b4671e498afe6 to your computer and use it in GitHub Desktop.
Atomic sequential effectual operations on a state using cats effect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xyz.neurotrade.tax | |
import cats.effect.std.Queue | |
import cats.effect.{Async, Resource} | |
import cats.implicits._ | |
object EffectfulRef { | |
def apply[F[_]: Async, S]( | |
initialState: S, | |
): Resource[F, EffectfulRef[F, S]] = for { | |
queue <- Resource.eval(Queue.unbounded[F, S => F[S]]) | |
state <- Resource.pure(new EffectfulRef(queue)) | |
_ <- Async[F].background { | |
var state = initialState | |
// todo: write test for error handling here | |
queue.take.flatMap(f => f(state).map(state = _)).foreverM | |
} | |
_ <- Resource.onFinalize(state.get.void) // dequeue before cancelling | |
} yield state | |
} | |
class EffectfulRef[F[_], A] private ( | |
queue: Queue[F, A => F[A]], | |
)(implicit F: Async[F]) { | |
def set(fa: F[A]): F[Unit] = modify(_ => fa.map(a2 => (a2, ()))) | |
def setPure(a: A): F[Unit] = modify(_ => F.pure((a, ()))) | |
def get: F[A] = modify(a => F.pure((a, a))) | |
def update(f: A => F[A]): F[Unit] = modify { a => | |
f(a).map(a2 => (a2, ())) | |
} | |
def updatePure(f: A => A): F[Unit] = modify { a => | |
F.pure(f(a), ()) | |
} | |
def getAndUpdate(f: A => F[A]): F[A] = modify { a => | |
f(a).map(a2 => (a2, a)) | |
} | |
def getAndUpdatePure(f: A => A): F[A] = modify { a => | |
F.pure(f(a), a) | |
} | |
def updateAndGet(f: A => F[A]): F[A] = modify { a => | |
f(a).map(a2 => (a2, a2)) | |
} | |
def updatePureAndGet(f: A => A): F[A] = modify { a => | |
val a2 = f(a) | |
F.pure(a2, a2) | |
} | |
def modify[B](f: A => F[(A, B)]): F[B] = F.async { callback => | |
queue.offer { s => | |
// todo: write test for error handling here | |
f(s) | |
.onError { case e: Throwable => F.delay(callback(e.asLeft)) } | |
.flatMap { case (s2, b) => F.delay(callback(b.asRight)).as(s2) } | |
} *> F.pure(None) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment