Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Created July 17, 2022 16:12
Show Gist options
  • Save Swoorup/06d766dd70ca0d6f3fff736a61be29b3 to your computer and use it in GitHub Desktop.
Save Swoorup/06d766dd70ca0d6f3fff736a61be29b3 to your computer and use it in GitHub Desktop.
Cache for redis streams with offsets
package lib.cache
import cats.effect.std.{UUIDGen, Supervisor}
import cats.effect.syntax.all.*
import cats.effect.{Async, Clock, Fiber, Outcome, Ref, Resource}
import cats.syntax.all.*
import fs2.Stream
import io.chrisdavenport.mapref.MapRef
import io.chrisdavenport.rediculous.RedisCommands
import io.chrisdavenport.rediculous.RedisConnection
import java.time.Instant
import scala.concurrent.duration.{Duration, FiniteDuration}
import lib.Agitation
import lib.collection.ext.*
import lib.stream.ext.*
import lib.stream.{Consumer, KillSwitch, Msg, MsgId, Producer}
import lib.stream.redis.ProducerConfig
import lib.stream.redis.RedisStreamCodec
import lib.stream.redis.ConsumerConfig
import lib.stream.MsgOffset
private final case class CachedItem[F[_]](
val streamKey: String,
val creationTime: Instant,
val lastAccessTime: Instant,
val fiberResultRef: Ref[F, Option[Either[Throwable, Unit]]],
val fiber: Fiber[F, Throwable, Unit],
val agitation: Agitation[F]
)
trait StreamCache[F[_], K, A]:
def get(key: K, offset: Option[MsgId]): Stream[F, Msg[A]]
object StreamCache:
/** Creates a new StreamCache instance.
*
* @param redis the Redis connection to use.
* @param create the function to create a new stream for a given key.
* @param maxCapacity the maximum number of elements to persist in the stream
* @param defaultExpiration the default expiration time for the stream.
* @return
*/
def ofRedisStream[F[_]: Async, K, A: RedisStreamCodec](
redis: RedisConnection[F],
create: K => Stream[F, A],
maxCapacity: Int,
readBlockDuration: FiniteDuration,
defaultExpiration: FiniteDuration
): Resource[F, StreamCache[F, K, A]] =
assert(maxCapacity > 0, "maxCapacity must be greater than 0")
assert(readBlockDuration > Duration.Zero, "readBlockDuration must be greater than 0")
assert(readBlockDuration < defaultExpiration, "readBlockDuration must be less than defaultExpiration")
for
randomId <- UUIDGen.randomString.toResource
supervisor <- Supervisor[F]
mapref <- MapRef.inShardedImmutableMap[F, F, K, CachedItem[F]](Runtime.getRuntime.availableProcessors()).toResource
yield
def createCachedItem(key: K) =
val streamKey = s"temp-cache-${randomId}-${key.toString.toLowerCase}"
for
producerConf <- Async[F].delay(ProducerConfig.Stream(streamKey, maxCapacity.toLong.some): ProducerConfig.Stream)
producer <- Async[F].delay(Producer.redis(redis, producerConf))
creationTime <- Clock[F].realTimeInstant
agitation <- Agitation.timed[F](defaultExpiration)
killswitch <- KillSwitch[F]
resultRef <- Ref.of(None: Option[Either[Throwable, Unit]])
fiber <- create(key).chunks
.evalMap(producer.publish(_))
.killWith(killswitch)
.compile
.drain
.attempt
.flatMap(r => resultRef.set(r.some))
.supervise(supervisor)
cancellation <- agitation.settled.flatMap(_ => killswitch.switch *> RedisCommands.del(streamKey).run(redis)).supervise(supervisor)
yield CachedItem(
streamKey = streamKey,
creationTime = creationTime,
lastAccessTime = creationTime,
fiberResultRef = resultRef,
fiber = fiber,
agitation = agitation
)
new StreamCache[F, K, A]:
def get(key: K, offset: Option[MsgId]): Stream[F, Msg[A]] =
Stream
.eval(
for
cached <- mapref.getOrAdd(key, createCachedItem(key))
lastAccessTime <- Clock[F].realTimeInstant
_ <- mapref(key).set(cached.copy(lastAccessTime = lastAccessTime).some)
_ <- cached.agitation.agitate(defaultExpiration)
_ <- cached.fiberResultRef.get.flatMap(_.traverse(_.liftTo[F]))
yield cached
)
.flatMap(cached =>
val from = offset.map(MsgOffset.LastAck(_)).getOrElse(MsgOffset.Latest)
val consumerConf: ConsumerConfig.Stream = ConsumerConfig.Stream(Seq(cached.streamKey -> from), Some(readBlockDuration))
Consumer.redis
.stream[F, A](redis, consumerConf)
.receiveM
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment