Last active
April 14, 2023 13:04
-
-
Save johnynek/a8b1a70ebbfaac2836068f8ba3087a72 to your computer and use it in GitHub Desktop.
A wrapper on ConcurrentHashMap to use with cats.effect.Ref
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.bykn.refmap | |
import cats.data.State | |
import cats.effect.Sync | |
import cats.effect.concurrent.Ref | |
import java.util.concurrent.ConcurrentHashMap | |
import cats.implicits._ | |
/** | |
* This is a total Map from K to Ref[F, V]. | |
* this allows us to use the Ref api backed by a ConcurrentHashMap | |
* | |
* This uses java universal hashCode and equality on K | |
*/ | |
trait RefMap[F[_], K, V] { | |
def apply(k: K): Ref[F, V] | |
} | |
object RefMap { | |
private class Impl[F[_], K, V](chm: ConcurrentHashMap[K, V], sync: Sync[F]) | |
extends RefMap[F, K, Option[V]] { | |
private implicit def syncF: Sync[F] = sync | |
val fnone0: F[None.type] = sync.pure(None) | |
def fnone[A]: F[Option[A]] = fnone0.widen[Option[A]] | |
class HandleRef(k: K) extends Ref[F, Option[V]] { | |
def access: F[(Option[V], Option[V] => F[Boolean])] = | |
sync.delay { | |
val init = chm.get(k) | |
if (init == null) { | |
val set: Option[V] => F[Boolean] = { opt: Option[V] => | |
opt match { | |
case None => sync.delay(!chm.containsKey(k)) | |
case Some(newV) => | |
sync.delay { | |
// it was initially empty | |
chm.putIfAbsent(k, newV) == null | |
} | |
} | |
} | |
(None, set) | |
} else { | |
val set: Option[V] => F[Boolean] = { opt: Option[V] => | |
opt match { | |
case None => | |
sync.delay(chm.remove(k, init)) | |
case Some(newV) => | |
sync.delay(chm.replace(k, init, newV)) | |
} | |
} | |
(Some(init), set) | |
} | |
} | |
def get: F[Option[V]] = | |
sync.delay { | |
Option(chm.get(k)) | |
} | |
def getAndSet(a: Option[V]): F[Option[V]] = | |
a match { | |
case None => | |
sync.delay(Option(chm.remove(k))) | |
case Some(v) => | |
sync.delay(Option(chm.put(k, v))) | |
} | |
def modify[B](f: Option[V] => (Option[V], B)): F[B] = { | |
lazy val loop: F[B] = tryModify(f).flatMap { | |
case None => loop | |
case Some(b) => sync.pure(b) | |
} | |
loop | |
} | |
def modifyState[B](state: State[Option[V], B]): F[B] = | |
modify(state.run(_).value) | |
def set(a: Option[V]): F[Unit] = | |
a match { | |
case None => sync.delay { chm.remove(k); () } | |
case Some(v) => sync.delay { chm.put(k, v); () } | |
} | |
def tryModify[B](f: Option[V] => (Option[V], B)): F[Option[B]] = | |
// we need the suspend because we do effects inside | |
sync.suspend { | |
val init = chm.get(k) | |
if (init == null) { | |
f(None) match { | |
case (None, b) => | |
// no-op | |
sync.pure(Some(b)) | |
case (Some(newV), b) => | |
if (chm.putIfAbsent(k, newV) == null) sync.pure(Some(b)) | |
else fnone | |
} | |
} else { | |
f(Some(init)) match { | |
case (None, b) => | |
if (chm.remove(k, init)) sync.pure(Some(b)) | |
else fnone | |
case (Some(next), b) => | |
if (chm.replace(k, init, next)) sync.pure(Some(b)) | |
else fnone | |
} | |
} | |
} | |
def tryModifyState[B](state: State[Option[V], B]): F[Option[B]] = | |
tryModify(state.run(_).value) | |
def tryUpdate(f: Option[V] => Option[V]): F[Boolean] = | |
tryModify { opt => | |
(f(opt), ()) | |
}.map(_.isDefined) | |
def update(f: Option[V] => Option[V]): F[Unit] = { | |
lazy val loop: F[Unit] = tryUpdate(f).flatMap { | |
case true => sync.unit | |
case false => loop | |
} | |
loop | |
} | |
} | |
def apply(k: K): Ref[F, Option[V]] = new HandleRef(k) | |
} | |
/** | |
* This allocates mutable memory, so it has to be inside F. The way to use things like this is to | |
* allocate one then `.map` them inside of constructors that need to access them. | |
* | |
* It is usually a mistake to have a `F[RefMap[F, K, V]]` field. You want `RefMap[F, K, V]` field | |
* which means the thing that needs it will also have to be inside of `F[_]`, which is because | |
* it needs access to mutable state so allocating it is also an effect. | |
*/ | |
def fromConcurrentHashMap[F[_]: Sync, K, V]( | |
initialCapacity: Int = 16, | |
loadFactor: Float = 0.75f, | |
concurrencyLevel: Int = 16): F[RefMap[F, K, Option[V]]] = | |
Sync[F].delay( | |
new Impl[F, K, V](new ConcurrentHashMap[K, V](initialCapacity, loadFactor, concurrencyLevel), | |
Sync[F])) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment