Created
June 8, 2023 11:51
-
-
Save arturaz/a8da51748eb59c744c72f10aa3257ee2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extension[F[_], A] (resource: Resource[F, A]) { | |
/** | |
* Returns a new resource that will be allocated only once and shared between all users. | |
* | |
* The resource is released when the last user is done with it. | |
*/ | |
def memoizeShared(using F: Async[F], syncAsASync: SyncAsASync[F, F]): F[Resource[F, A]] = { | |
case class State(users: Int, resource: A, release: F[Unit]) | |
for { | |
stateRef <- SynchronizedRef[F, F].of(Option.empty[State]) | |
} yield Resource.make(stateRef.modify { | |
case None => | |
// Allocate the resource. | |
for { | |
a <- resource.allocated | |
state = State(1, a._1, a._2) | |
} yield (Some(state), state.resource) | |
case Some(state) => | |
// Register that we have a user. | |
val newState = state.modify(_.users).using(_ + 1) | |
F.pure((Some(newState), state.resource)) | |
}.map(_._2)) { _ => | |
stateRef.modify { | |
case None => | |
// This should never happen. | |
F.raiseError(new IllegalStateException("Resource was released but it was never allocated.")) | |
case Some(state) => | |
val newState = state.modify(_.users).using(_ - 1) | |
if (newState.users == 0) { | |
// Release the resource. | |
newState.release.as((None, ())) | |
} else { | |
// Unregister that we have a user. | |
F.pure((Some(newState), ())) | |
} | |
}.map(_._2) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package app.data | |
import cats.effect.kernel.Resource | |
import cats.effect.std.Semaphore | |
import cats.effect.{Concurrent, IO, Ref, Sync, SyncIO} | |
import cats.implicits.{toFlatMapOps, toFunctorOps} | |
import cats.{Applicative, Functor, Monad, ~>} | |
/** | |
* [[Ref]] of [[A]] which relies on locking instead of compare-and-swap to provide concurrent access data safety. | |
* | |
* [[ASyncF]] is the type which is used by the lock-use-unlock operations. It is asynchronous as we might have to wait | |
* until the lock can be obtained. | |
* | |
* [[SyncF]] is the type which is used by the operations that do not obtain the lock first. | |
*/ | |
trait SynchronizedRef[ASyncF[_], SyncF[_], A] | |
extends SynchronizedRefSync[SyncF, A] with SynchronizedRefASync[ASyncF, A] | |
{ self => | |
/** Projects the [[SynchronizedRef]] in two ways, turning it from [[A]] to [[B]]. */ | |
def bimap[B](mapper: A => B)( | |
contraMapper: (A, B) => A | |
)(implicit syncFunctor: Functor[SyncF]): SynchronizedRef[ASyncF, SyncF, B] = new SynchronizedRef[ASyncF, SyncF, B] { | |
override protected implicit def asyncConcurrent = self.asyncConcurrent | |
override lazy val resource: Resource[ASyncF, B] = self.resource.map(mapper) | |
override def getImmediate = self.getImmediate.map(mapper) | |
override def modify[Result](f: B => ASyncF[(B, Result)]) = | |
self | |
.modify(a => f(mapper(a)).map { case tpl @ (b, _) => (contraMapper(a, b), tpl)}) | |
.map { case (_, tpl) => tpl } | |
override def modifyMaybeEffect[Result](f: B => ASyncF[Option[ASyncF[(B, Result)]]]) = | |
self | |
.modifyMaybeEffect { a => | |
f(mapper(a)).map(opt => opt.map(asyncF => asyncF.map { case tpl @ (b, _) => | |
(contraMapper(a, b), tpl) | |
})) | |
} | |
.map(opt => opt.map { case (_, tpl) => tpl }) | |
} | |
} | |
/** Parts of [[SynchronizedRef]] which do not acquire the lock first. */ | |
trait SynchronizedRefSync[SyncF[_], A] { | |
/** Fetches the data without acquiring the lock first. */ | |
def getImmediate: SyncF[A] | |
} | |
/** Parts of [[SynchronizedRef]] which acquire the lock. */ | |
trait SynchronizedRefASync[ASyncF[_], A] { | |
protected implicit def asyncConcurrent: Concurrent[ASyncF] | |
/** Acquires a lock, fetches the data, releases the lock. */ | |
def get: ASyncF[A] = use(Applicative[ASyncF].pure) | |
/** Acquires a lock, fetches the data, runs the effect, releases the lock. */ | |
def use[B](f: A => ASyncF[B]): ASyncF[B] = resource.use(f) | |
/** | |
* [[Resource]] that acquires a lock and fetches the data upon [[Resource.use]] and releases the lock once it is | |
* finished. | |
* */ | |
lazy val resource: Resource[ASyncF, A] | |
/** Acquires a lock, fetches the data, invokes the provided function, executes the effect, writes back the data and | |
* releases the lock. */ | |
def modify[Result](f: A => ASyncF[(A, Result)]): ASyncF[(A, Result)] | |
/** As [[modify]] but does not produce a result. */ | |
def modify_(f: A => ASyncF[A]): ASyncF[A] = modify(a => f(a).map((_, ()))).map(_._1) | |
/** | |
* Like [[modify]], but can terminate early without setting back the value. | |
* | |
* Returns an optional value if the update completed. | |
*/ | |
def modifyMaybe[Result](f: A => Option[ASyncF[(A, Result)]]): ASyncF[Option[(A, Result)]] = | |
modifyMaybeEffect(a => Applicative[ASyncF].pure(f(a))) | |
/** As [[modifyMaybe]] but does not produce a result. */ | |
def modifyMaybe_(f: A => Option[ASyncF[A]]): ASyncF[Option[A]] = | |
modifyMaybe(a => f(a).map(asyncF => asyncF.map((_, ())))).map(opt => opt.map(_._1)) | |
/** Like [[modifyMaybe]], but to produce the [[Option]] we need to run an effect first. */ | |
def modifyMaybeEffect[Result](f: A => ASyncF[Option[ASyncF[(A, Result)]]]): ASyncF[Option[(A, Result)]] | |
/** As [[modifyMaybeEffect]] but does not produce a result. */ | |
def modifyMaybeEffect_(f: A => ASyncF[Option[ASyncF[A]]]): ASyncF[Option[A]] = | |
modifyMaybeEffect(a => f(a).map(opt => opt.map(asyncF => asyncF.map((_, ()))))).map(opt => opt.map(_._1)) | |
} | |
object SynchronizedRef { | |
/** | |
* Builds a [[SynchronizedRef]] value. | |
* | |
* This builder uses the | |
* [[https://typelevel.org/cats/guidelines.html#partially-applied-type Partially-Applied Type]] | |
* technique. | |
* | |
* {{{ | |
* SynchronizedRef[IO, SyncIO].of(10) <-> SynchronizedRef.of[IO, SyncIO, Int](10) | |
* }}} | |
* | |
* @see [[of]] | |
*/ | |
def apply[ASyncF[_], SyncF[_]]( | |
implicit concurrent: Concurrent[ASyncF], sync: Sync[SyncF] | |
): ApplyBuilders[ASyncF, SyncF] = | |
new ApplyBuilders(concurrent, sync) | |
/** | |
* Converts synchronous type [[SyncF]] to it's asynchronous counterpart [[ASyncF]]. This is essentially [[~>]] | |
* but we give it a separate trait so that implicit resolution would pick it up automatically. | |
**/ | |
trait SyncAsASync[SyncF[_], ASyncF[_]] extends (SyncF ~> ASyncF) | |
object SyncAsASync { | |
given SyncAsASync[IO, IO] = new SyncAsASync[IO, IO] { | |
override def apply[A](f: IO[A]) = f | |
} | |
given catsIO: SyncAsASync[SyncIO, IO] = new SyncAsASync[SyncIO, IO] { | |
override def apply[A](f: SyncIO[A]) = f.to[IO] | |
} | |
} | |
/** Creates a new instance of [[SynchronizedRef]]. */ | |
def of[ASyncF[_] : Concurrent, SyncF[_] : Sync, A]( | |
initial: A | |
)(implicit syncAsASync: SyncAsASync[SyncF, ASyncF]): ASyncF[SynchronizedRef[ASyncF, SyncF, A]] = { | |
for { | |
ref <- syncAsASync(Ref[SyncF].of(initial)) | |
semaphore <- Semaphore[ASyncF](1) | |
} yield new SynchronizedRef[ASyncF, SyncF, A] { | |
override protected def asyncConcurrent = implicitly | |
//region SynchronizedRefSync | |
override def getImmediate = ref.get | |
//endregion | |
//region SynchronizedRefASync | |
override lazy val resource: Resource[ASyncF, A] = | |
semaphore.permit.evalMap(_ => syncAsASync(ref.get)) | |
override def modify[Result](f: A => ASyncF[(A, Result)]) = { | |
semaphore.permit.use { _ => | |
syncAsASync(ref.get).flatMap(a => f(a)).flatTap { case (a, _) => syncAsASync(ref.set(a)) } | |
} | |
} | |
override def modifyMaybeEffect[Result](f: A => ASyncF[Option[ASyncF[(A, Result)]]]) = { | |
semaphore.permit.use(_ => syncAsASync(ref.get).flatMap { a => | |
f(a).flatMap { | |
case None => Monad[ASyncF].pure(None) | |
case Some(effect) => effect.flatMap { case orig @ (a, _) => | |
syncAsASync(ref.set(a).map(_ => Some(orig))) | |
} | |
} | |
}) | |
} | |
//endregion | |
} | |
} | |
final class ApplyBuilders[ASyncF[_], SyncF[_]]( | |
concurrent: Concurrent[ASyncF], sync: Sync[SyncF] | |
) { | |
/** Creates a new instance of [[SynchronizedRef]]. */ | |
def of[A](a: A)(implicit syncAsASync: SyncAsASync[SyncF, ASyncF]): ASyncF[SynchronizedRef[ASyncF, SyncF, A]] = | |
SynchronizedRef.of(a)(concurrent, sync, syncAsASync) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment