Created
July 17, 2022 16:12
-
-
Save Swoorup/06d766dd70ca0d6f3fff736a61be29b3 to your computer and use it in GitHub Desktop.
Cache for redis streams with offsets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lib.cache | |
import cats.effect.std.{UUIDGen, Supervisor} | |
import cats.effect.syntax.all.* | |
import cats.effect.{Async, Clock, Fiber, Outcome, Ref, Resource} | |
import cats.syntax.all.* | |
import fs2.Stream | |
import io.chrisdavenport.mapref.MapRef | |
import io.chrisdavenport.rediculous.RedisCommands | |
import io.chrisdavenport.rediculous.RedisConnection | |
import java.time.Instant | |
import scala.concurrent.duration.{Duration, FiniteDuration} | |
import lib.Agitation | |
import lib.collection.ext.* | |
import lib.stream.ext.* | |
import lib.stream.{Consumer, KillSwitch, Msg, MsgId, Producer} | |
import lib.stream.redis.ProducerConfig | |
import lib.stream.redis.RedisStreamCodec | |
import lib.stream.redis.ConsumerConfig | |
import lib.stream.MsgOffset | |
private final case class CachedItem[F[_]]( | |
val streamKey: String, | |
val creationTime: Instant, | |
val lastAccessTime: Instant, | |
val fiberResultRef: Ref[F, Option[Either[Throwable, Unit]]], | |
val fiber: Fiber[F, Throwable, Unit], | |
val agitation: Agitation[F] | |
) | |
trait StreamCache[F[_], K, A]: | |
def get(key: K, offset: Option[MsgId]): Stream[F, Msg[A]] | |
object StreamCache: | |
/** Creates a new StreamCache instance. | |
* | |
* @param redis the Redis connection to use. | |
* @param create the function to create a new stream for a given key. | |
* @param maxCapacity the maximum number of elements to persist in the stream | |
* @param defaultExpiration the default expiration time for the stream. | |
* @return | |
*/ | |
def ofRedisStream[F[_]: Async, K, A: RedisStreamCodec]( | |
redis: RedisConnection[F], | |
create: K => Stream[F, A], | |
maxCapacity: Int, | |
readBlockDuration: FiniteDuration, | |
defaultExpiration: FiniteDuration | |
): Resource[F, StreamCache[F, K, A]] = | |
assert(maxCapacity > 0, "maxCapacity must be greater than 0") | |
assert(readBlockDuration > Duration.Zero, "readBlockDuration must be greater than 0") | |
assert(readBlockDuration < defaultExpiration, "readBlockDuration must be less than defaultExpiration") | |
for | |
randomId <- UUIDGen.randomString.toResource | |
supervisor <- Supervisor[F] | |
mapref <- MapRef.inShardedImmutableMap[F, F, K, CachedItem[F]](Runtime.getRuntime.availableProcessors()).toResource | |
yield | |
def createCachedItem(key: K) = | |
val streamKey = s"temp-cache-${randomId}-${key.toString.toLowerCase}" | |
for | |
producerConf <- Async[F].delay(ProducerConfig.Stream(streamKey, maxCapacity.toLong.some): ProducerConfig.Stream) | |
producer <- Async[F].delay(Producer.redis(redis, producerConf)) | |
creationTime <- Clock[F].realTimeInstant | |
agitation <- Agitation.timed[F](defaultExpiration) | |
killswitch <- KillSwitch[F] | |
resultRef <- Ref.of(None: Option[Either[Throwable, Unit]]) | |
fiber <- create(key).chunks | |
.evalMap(producer.publish(_)) | |
.killWith(killswitch) | |
.compile | |
.drain | |
.attempt | |
.flatMap(r => resultRef.set(r.some)) | |
.supervise(supervisor) | |
cancellation <- agitation.settled.flatMap(_ => killswitch.switch *> RedisCommands.del(streamKey).run(redis)).supervise(supervisor) | |
yield CachedItem( | |
streamKey = streamKey, | |
creationTime = creationTime, | |
lastAccessTime = creationTime, | |
fiberResultRef = resultRef, | |
fiber = fiber, | |
agitation = agitation | |
) | |
new StreamCache[F, K, A]: | |
def get(key: K, offset: Option[MsgId]): Stream[F, Msg[A]] = | |
Stream | |
.eval( | |
for | |
cached <- mapref.getOrAdd(key, createCachedItem(key)) | |
lastAccessTime <- Clock[F].realTimeInstant | |
_ <- mapref(key).set(cached.copy(lastAccessTime = lastAccessTime).some) | |
_ <- cached.agitation.agitate(defaultExpiration) | |
_ <- cached.fiberResultRef.get.flatMap(_.traverse(_.liftTo[F])) | |
yield cached | |
) | |
.flatMap(cached => | |
val from = offset.map(MsgOffset.LastAck(_)).getOrElse(MsgOffset.Latest) | |
val consumerConf: ConsumerConfig.Stream = ConsumerConfig.Stream(Seq(cached.streamKey -> from), Some(readBlockDuration)) | |
Consumer.redis | |
.stream[F, A](redis, consumerConf) | |
.receiveM | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment