Skip to content

Instantly share code, notes, and snippets.

@ChristopherDavenport
Last active May 7, 2024 15:38
Show Gist options
  • Save ChristopherDavenport/8364fa2ca14557fce12baca6cb38d5b8 to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/8364fa2ca14557fce12baca6cb38d5b8 to your computer and use it in GitHub Desktop.
LRUCache Implementation
// import cats._
import cats.effect._
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.implicits._
import cats.implicits._
import scala.concurrent.duration._
import scala.collection.immutable.Map
import io.chrisdavenport.mapref.MapRef
// import io.chrisdavenport.mapref.implicits._
import java.util.concurrent.ConcurrentHashMap
final class LRUCache[F[_], K, V] private (
lock: Semaphore[F],
queue: scala.collection.mutable.Queue[K],
val maxSize: Int,
private val mapRef: MapRef[F, K, Option[LRUCache.LRUCacheItem[V]]],
private val purgeExpiredEntriesOpt : Option[Long => F[List[K]]], // Optional Performance Improvement over Default
val defaultExpiration: Option[TimeSpec],
)(implicit sync: Sync[F], C: Clock[F]) extends Cache[F, K, V]{
import LRUCache.LRUCacheItem
private val noneF:F[None.type] = sync.pure(None)
private def noneFA[A]: F[Option[A]] = noneF.widen[Option[A]]
private def purgeExpiredEntriesDefault(now: Long): F[List[K]] = {
mapRef.keys.flatMap(l =>
l.flatTraverse(k =>
mapRef(k).modify(optItem =>
optItem.map(item =>
if (LRUCache.isExpired(now, item))
(None, List(k))
else
(optItem, List.empty)
).getOrElse((optItem, List.empty))
)
)
)
}
private val purgeExpiredEntries: Long => F[List[K]] =
purgeExpiredEntriesOpt.getOrElse(purgeExpiredEntriesDefault)
/**
* Delete all items that are expired.
*
* This is one big atomic operation.
**/
def purgeExpired: F[Unit] = {
lockResource >> {
for {
now <- Resource.liftF(C.monotonic(NANOSECONDS))
out <- Resource.make(purgeExpiredEntries(now))(out => sync.delay{queue.subtractAll(out); ()} )
} yield out
}
}.use(_ => sync.unit)
private val lockResource = Resource.make(lock.acquire)(_ => lock.release)
// Members declared in io.chrisdavenport.mules.Delete
def delete(k: K): F[Unit] = {
lockResource >>
Resource.make(
mapRef(k).modify{
case None =>
(None, sync.unit)
case Some(_) =>
(None, sync.delay{queue.subtractOne(k); ()})
}
)(identity)
}.use(_ => sync.unit)
// Members declared in io.chrisdavenport.mules.Insert
def insert(k: K, v: V): F[Unit] =
insertWithTimeout(defaultExpiration)(k, v)
def insertWithTimeout(optionTimeout: Option[TimeSpec])(k: K, v: V): F[Unit] = {
lockResource >> {
for {
now <- Resource.liftF(C.monotonic(NANOSECONDS))
timeout = optionTimeout.map(ts => TimeSpec.unsafeFromNanos(now + ts.nanos))
out <- Resource.make{
mapRef(k).modify{
case None =>
def insertEmpty: F[Unit] = sync.suspend{
if (queue.size >= maxSize) {
queue.enqueue(k)
val head = queue.dequeue()
mapRef(head).set(None)
} else {
queue.enqueue(k)
sync.unit
}
}
(LRUCacheItem[V](v, timeout).some, insertEmpty)
case Some(_) => (LRUCacheItem[V](v, timeout).some, sync.delay{queue.subtractOne(k); queue.enqueue(k); ()})
}
}(identity)
} yield out
}
}.use(_ => sync.unit)
// Members declared in io.chrisdavenport.mules.Lookup
def lookup(k: K): F[Option[V]] = lockResource.use{_ =>
for {
now <- C.monotonic(NANOSECONDS)
out <-
mapRef(k).modify[F[Option[LRUCacheItem[V]]]]{
case s@Some(value) =>
if (LRUCache.isExpired(now, value)){
(None, sync.delay(queue.subtractOne(k)).as(None))
} else {
(s, sync.delay{queue.subtractOne(k); queue.enqueue(k); ()} >> sync.pure(s))
}
case None =>
(None, noneFA)
}.bracketCase{
a => a.map(_.map(_.item))
}{
case (_, ExitCase.Completed) => sync.unit
case (action, ExitCase.Canceled) => action.void
case (action, ExitCase.Error(_)) => action.void
}
} yield out
}
}
object LRUCache {
private case class LRUCacheItem[A](
item: A,
itemExpiration: Option[TimeSpec]
)
private def isExpired[A](checkAgainst: Long, cacheItem: LRUCacheItem[A]): Boolean = {
cacheItem.itemExpiration match{
case Some(e) if e.nanos < checkAgainst => true
case _ => false
}
}
/**
*
* Initiates a background process that checks for expirations every certain amount of time.
*
* @param memoryCache: The cache to check and expire automatically.
* @param checkOnExpirationsEvery: How often the expiration process should check for expired keys.
*
* @return an `Resource[F, Unit]` that will keep removing expired entries in the background.
**/
def liftToAuto[F[_]: Concurrent: Timer, K, V](
memoryCache: MemoryCache[F, K, V],
checkOnExpirationsEvery: TimeSpec
): Resource[F, Unit] = {
def runExpiration(cache: MemoryCache[F, K, V]): F[Unit] = {
val check = TimeSpec.toDuration(checkOnExpirationsEvery)
Timer[F].sleep(check) >> cache.purgeExpired >> runExpiration(cache)
}
Resource.make(runExpiration(memoryCache).start)(_.cancel).void
}
/**
* Create a new cache with a default expiration value for newly added cache items.
*
* Items that are added to the cache without an explicit expiration value (using insert) will be inserted with the default expiration value.
*
* If the specified default expiration value is None, items inserted by insert will never expire.
**/
def ofSingleImmutableMap[F[_]: Concurrent: Clock, K, V](
maxSize: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] =
Ref.of[F, Map[K, LRUCacheItem[V]]](Map.empty[K, LRUCacheItem[V]])
.flatMap{ref =>
Semaphore[F](1).map{sem =>
new LRUCache[F, K, V](
sem,
scala.collection.mutable.Queue.empty[K],
maxSize,
MapRef.fromSingleImmutableMapRef(ref),
{l: Long => SingleRef.purgeExpiredEntries(ref)(l)}.some,
defaultExpiration,
)
}
}
def ofShardedImmutableMap[F[_]: Concurrent : Clock, K, V](
maxSize: Int,
shardCount: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] =
MapRef.ofShardedImmutableMap[F, K, LRUCacheItem[V]](shardCount).flatMap{mr =>
ofMapRef[F, K, V](
mr,
maxSize,
defaultExpiration,
)
}
def ofConcurrentHashMap[F[_]: Concurrent: Clock, K, V](
maxSize: Int,
defaultExpiration: Option[TimeSpec],
initialCapacity: Int = 16,
loadFactor: Float = 0.75f,
concurrencyLevel: Int = 16,
): F[LRUCache[F, K, V]] = Sync[F].suspend{
val chm = new ConcurrentHashMap[K, LRUCacheItem[V]](initialCapacity, loadFactor, concurrencyLevel)
ofMapRef[F, K, V](
MapRef.fromConcurrentHashMap(chm),
maxSize,
defaultExpiration,
)
}
def ofMapRef[F[_]: Concurrent: Clock, K, V](
mr: MapRef[F, K, Option[LRUCacheItem[V]]],
maxSize: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] = Semaphore[F](1).map{sem =>
new LRUCache[F, K, V](
sem,
scala.collection.mutable.Queue.empty[K],
maxSize,
mr,
None,
defaultExpiration,
)
}
private object SingleRef {
def purgeExpiredEntries[F[_], K, V](ref: Ref[F, Map[K, LRUCacheItem[V]]])(now: Long): F[List[K]] = {
ref.modify(
m => {
val l = scala.collection.mutable.ListBuffer.empty[K]
m.foreach{ case (k, item) =>
if (isExpired(now, item)) {
l.+=(k)
}
}
val remove = l.result
val finalMap = m -- remove
(finalMap, remove)
}
)
}
}
}
import org.specs2.mutable.Specification
import scala.concurrent.duration._
import cats.effect._
// import cats.effect.concurrent._
// import cats.effect.implicits._
import cats.effect.IO
import cats.effect.laws.util.TestContext
import cats.effect.specs2.CatsIO
class LRUCacheSpec extends Specification with CatsIO {
"LRUCache" should {
"keep only requested number of elements" in {
for {
cache <- LRUCache.ofSingleImmutableMap[IO, Int, String](2, None)
_ <- cache.insert(1, "foo")
_ <- cache.insert(2, "bar")
_ <- cache.insert(3, "baz")
out1 <- cache.lookup(1)
out2 <- cache.lookup(2)
out3 <- cache.lookup(3)
} yield {
(out1, out2, out3).must_===((None, Some("bar"), Some("baz")))
}
}
"override with newer lookups" in {
for {
cache <- LRUCache.ofSingleImmutableMap[IO, Int, String](2, None)
_ <- cache.insert(1, "foo")
_ <- cache.insert(2, "bar")
_ <- cache.lookup(1)
_ <- cache.insert(3, "baz")
out1 <- cache.lookup(1)
out2 <- cache.lookup(2)
out3 <- cache.lookup(3)
} yield {
(out1, out2, out3).must_===((Some("foo"), None, Some("baz")))
}
}
"remove a value after delete" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, None)(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- cache.delete("Foo")
value <- cache.lookup("Foo")
} yield value
setup.map(_ must_=== None)
}
"Remove a value in mass delete" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, Some(TimeSpec.unsafeFromDuration(1.second)))(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- Sync[IO].delay(ctx.tick(2.seconds))
_ <- cache.purgeExpired
value <- cache.lookup("Foo")
} yield value
setup.map(_ must_=== None)
}
"Lookup after interval fails to get a value" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, Some(TimeSpec.unsafeFromDuration(1.second)))(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- Sync[IO].delay(ctx.tick(2.seconds))
value <- cache.lookup("Foo")
} yield value
setup.map(_ must_=== None)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment