Skip to content

Instantly share code, notes, and snippets.

Last active May 7, 2024 15:38
Show Gist options
  • Save ChristopherDavenport/8364fa2ca14557fce12baca6cb38d5b8 to your computer and use it in GitHub Desktop.
Save ChristopherDavenport/8364fa2ca14557fce12baca6cb38d5b8 to your computer and use it in GitHub Desktop.
LRUCache Implementation
// import cats._
import cats.effect._
import cats.effect.concurrent.{Ref, Semaphore}
import cats.effect.implicits._
import cats.implicits._
import scala.concurrent.duration._
import scala.collection.immutable.Map
import io.chrisdavenport.mapref.MapRef
// import io.chrisdavenport.mapref.implicits._
import java.util.concurrent.ConcurrentHashMap
final class LRUCache[F[_], K, V] private (
lock: Semaphore[F],
queue: scala.collection.mutable.Queue[K],
val maxSize: Int,
private val mapRef: MapRef[F, K, Option[LRUCache.LRUCacheItem[V]]],
private val purgeExpiredEntriesOpt : Option[Long => F[List[K]]], // Optional Performance Improvement over Default
val defaultExpiration: Option[TimeSpec],
)(implicit sync: Sync[F], C: Clock[F]) extends Cache[F, K, V]{
import LRUCache.LRUCacheItem
private val noneF:F[None.type] = sync.pure(None)
private def noneFA[A]: F[Option[A]] = noneF.widen[Option[A]]
private def purgeExpiredEntriesDefault(now: Long): F[List[K]] = {
mapRef.keys.flatMap(l =>
l.flatTraverse(k =>
mapRef(k).modify(optItem => =>
if (LRUCache.isExpired(now, item))
(None, List(k))
(optItem, List.empty)
).getOrElse((optItem, List.empty))
private val purgeExpiredEntries: Long => F[List[K]] =
* Delete all items that are expired.
* This is one big atomic operation.
def purgeExpired: F[Unit] = {
lockResource >> {
for {
now <- Resource.liftF(C.monotonic(NANOSECONDS))
out <- Resource.make(purgeExpiredEntries(now))(out => sync.delay{queue.subtractAll(out); ()} )
} yield out
}.use(_ => sync.unit)
private val lockResource = Resource.make(lock.acquire)(_ => lock.release)
// Members declared in io.chrisdavenport.mules.Delete
def delete(k: K): F[Unit] = {
lockResource >>
case None =>
(None, sync.unit)
case Some(_) =>
(None, sync.delay{queue.subtractOne(k); ()})
}.use(_ => sync.unit)
// Members declared in io.chrisdavenport.mules.Insert
def insert(k: K, v: V): F[Unit] =
insertWithTimeout(defaultExpiration)(k, v)
def insertWithTimeout(optionTimeout: Option[TimeSpec])(k: K, v: V): F[Unit] = {
lockResource >> {
for {
now <- Resource.liftF(C.monotonic(NANOSECONDS))
timeout = => TimeSpec.unsafeFromNanos(now + ts.nanos))
out <- Resource.make{
case None =>
def insertEmpty: F[Unit] = sync.suspend{
if (queue.size >= maxSize) {
val head = queue.dequeue()
} else {
(LRUCacheItem[V](v, timeout).some, insertEmpty)
case Some(_) => (LRUCacheItem[V](v, timeout).some, sync.delay{queue.subtractOne(k); queue.enqueue(k); ()})
} yield out
}.use(_ => sync.unit)
// Members declared in io.chrisdavenport.mules.Lookup
def lookup(k: K): F[Option[V]] = lockResource.use{_ =>
for {
now <- C.monotonic(NANOSECONDS)
out <-
case s@Some(value) =>
if (LRUCache.isExpired(now, value)){
(None, sync.delay(queue.subtractOne(k)).as(None))
} else {
(s, sync.delay{queue.subtractOne(k); queue.enqueue(k); ()} >> sync.pure(s))
case None =>
(None, noneFA)
a =>
case (_, ExitCase.Completed) => sync.unit
case (action, ExitCase.Canceled) => action.void
case (action, ExitCase.Error(_)) => action.void
} yield out
object LRUCache {
private case class LRUCacheItem[A](
item: A,
itemExpiration: Option[TimeSpec]
private def isExpired[A](checkAgainst: Long, cacheItem: LRUCacheItem[A]): Boolean = {
cacheItem.itemExpiration match{
case Some(e) if e.nanos < checkAgainst => true
case _ => false
* Initiates a background process that checks for expirations every certain amount of time.
* @param memoryCache: The cache to check and expire automatically.
* @param checkOnExpirationsEvery: How often the expiration process should check for expired keys.
* @return an `Resource[F, Unit]` that will keep removing expired entries in the background.
def liftToAuto[F[_]: Concurrent: Timer, K, V](
memoryCache: MemoryCache[F, K, V],
checkOnExpirationsEvery: TimeSpec
): Resource[F, Unit] = {
def runExpiration(cache: MemoryCache[F, K, V]): F[Unit] = {
val check = TimeSpec.toDuration(checkOnExpirationsEvery)
Timer[F].sleep(check) >> cache.purgeExpired >> runExpiration(cache)
* Create a new cache with a default expiration value for newly added cache items.
* Items that are added to the cache without an explicit expiration value (using insert) will be inserted with the default expiration value.
* If the specified default expiration value is None, items inserted by insert will never expire.
def ofSingleImmutableMap[F[_]: Concurrent: Clock, K, V](
maxSize: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] =
Ref.of[F, Map[K, LRUCacheItem[V]]](Map.empty[K, LRUCacheItem[V]])
.flatMap{ref =>
Semaphore[F](1).map{sem =>
new LRUCache[F, K, V](
{l: Long => SingleRef.purgeExpiredEntries(ref)(l)}.some,
def ofShardedImmutableMap[F[_]: Concurrent : Clock, K, V](
maxSize: Int,
shardCount: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] =
MapRef.ofShardedImmutableMap[F, K, LRUCacheItem[V]](shardCount).flatMap{mr =>
ofMapRef[F, K, V](
def ofConcurrentHashMap[F[_]: Concurrent: Clock, K, V](
maxSize: Int,
defaultExpiration: Option[TimeSpec],
initialCapacity: Int = 16,
loadFactor: Float = 0.75f,
concurrencyLevel: Int = 16,
): F[LRUCache[F, K, V]] = Sync[F].suspend{
val chm = new ConcurrentHashMap[K, LRUCacheItem[V]](initialCapacity, loadFactor, concurrencyLevel)
ofMapRef[F, K, V](
def ofMapRef[F[_]: Concurrent: Clock, K, V](
mr: MapRef[F, K, Option[LRUCacheItem[V]]],
maxSize: Int,
defaultExpiration: Option[TimeSpec]
): F[LRUCache[F, K, V]] = Semaphore[F](1).map{sem =>
new LRUCache[F, K, V](
private object SingleRef {
def purgeExpiredEntries[F[_], K, V](ref: Ref[F, Map[K, LRUCacheItem[V]]])(now: Long): F[List[K]] = {
m => {
val l = scala.collection.mutable.ListBuffer.empty[K]
m.foreach{ case (k, item) =>
if (isExpired(now, item)) {
val remove = l.result
val finalMap = m -- remove
(finalMap, remove)
import org.specs2.mutable.Specification
import scala.concurrent.duration._
import cats.effect._
// import cats.effect.concurrent._
// import cats.effect.implicits._
import cats.effect.IO
import cats.effect.laws.util.TestContext
import cats.effect.specs2.CatsIO
class LRUCacheSpec extends Specification with CatsIO {
"LRUCache" should {
"keep only requested number of elements" in {
for {
cache <- LRUCache.ofSingleImmutableMap[IO, Int, String](2, None)
_ <- cache.insert(1, "foo")
_ <- cache.insert(2, "bar")
_ <- cache.insert(3, "baz")
out1 <- cache.lookup(1)
out2 <- cache.lookup(2)
out3 <- cache.lookup(3)
} yield {
(out1, out2, out3).must_===((None, Some("bar"), Some("baz")))
"override with newer lookups" in {
for {
cache <- LRUCache.ofSingleImmutableMap[IO, Int, String](2, None)
_ <- cache.insert(1, "foo")
_ <- cache.insert(2, "bar")
_ <- cache.lookup(1)
_ <- cache.insert(3, "baz")
out1 <- cache.lookup(1)
out2 <- cache.lookup(2)
out3 <- cache.lookup(3)
} yield {
(out1, out2, out3).must_===((Some("foo"), None, Some("baz")))
"remove a value after delete" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, None)(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- cache.delete("Foo")
value <- cache.lookup("Foo")
} yield value must_=== None)
"Remove a value in mass delete" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, Some(TimeSpec.unsafeFromDuration(1.second)))(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- Sync[IO].delay(ctx.tick(2.seconds))
_ <- cache.purgeExpired
value <- cache.lookup("Foo")
} yield value must_=== None)
"Lookup after interval fails to get a value" in {
val ctx = TestContext()
implicit val testTimer: Timer[IO] = ctx.timer[IO]
val setup = for {
cache <- LRUCache.ofSingleImmutableMap[IO, String, Int](5, Some(TimeSpec.unsafeFromDuration(1.second)))(Concurrent[IO], testTimer.clock)
_ <- cache.insert("Foo", 1)
_ <- Sync[IO].delay(ctx.tick(2.seconds))
value <- cache.lookup("Foo")
} yield value must_=== None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment