Atomic sequential effectual operations on a state using cats effect
import cats.effect.std.Queue
import cats.effect.{Async, Resource}
import cats.implicits._
object EffectfulRef {
def apply[F[_]: Async, S](
initialState: S,
): Resource[F, EffectfulRef[F, S]] = for {
queue <- Resource.eval(Queue.unbounded[F, S => F[S]])
state <- Resource.pure(new EffectfulRef(queue))
_ <- Async[F].background {
var state = initialState
// todo: write test for error handling here
queue.take.flatMap(f => f(state).map(state = _)).foreverM
_ <- Resource.onFinalize(state.get.void) // dequeue before cancelling
} yield state
class EffectfulRef[F[_], A] private (
queue: Queue[F, A => F[A]],
)(implicit F: Async[F]) {
def set(fa: F[A]): F[Unit] = modify(_ => => (a2, ())))
def setPure(a: A): F[Unit] = modify(_ => F.pure((a, ())))
def get: F[A] = modify(a => F.pure((a, a)))
def update(f: A => F[A]): F[Unit] = modify { a =>
f(a).map(a2 => (a2, ()))
def updatePure(f: A => A): F[Unit] = modify { a =>
F.pure(f(a), ())
def getAndUpdate(f: A => F[A]): F[A] = modify { a =>
f(a).map(a2 => (a2, a))
def getAndUpdatePure(f: A => A): F[A] = modify { a =>
F.pure(f(a), a)
def updateAndGet(f: A => F[A]): F[A] = modify { a =>
f(a).map(a2 => (a2, a2))
def updatePureAndGet(f: A => A): F[A] = modify { a =>
val a2 = f(a)
F.pure(a2, a2)
def modify[B](f: A => F[(A, B)]): F[B] = F.async { callback =>
queue.offer { s =>
// todo: write test for error handling here
.onError { case e: Throwable => F.delay(callback(e.asLeft)) }
.flatMap { case (s2, b) => F.delay(callback(b.asRight)).as(s2) }
} *> F.pure(None)
