Skip to content

Instantly share code, notes, and snippets.

@longshorej
Created July 12, 2023 19:40
Show Gist options
  • Save longshorej/7b6cd36097885664b8180749c04bfd5d to your computer and use it in GitHub Desktop.
Save longshorej/7b6cd36097885664b8180749c04bfd5d to your computer and use it in GitHub Desktop.
Akka Streams Async Dedup
package dedup
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.Try
import akka.NotUsed
import akka.stream.scaladsl.Flow
import scalacache.CacheConfig
import scalacache.modes.sync._
/** Utilities for stream deduplication */
object DedupFlows {
/** Wrapper class, indicating whether the value produced was the result of a duplicate upstream element */
final case class Result[A](value: A, duplicate: Boolean)
/** Provides a [[Flow]] that dispatches futures for incoming elements. If the incoming
* element was already seen (according to its extracted key), the previously dispatched future
* is emitted.
*
* Emitted values are wrapped in a [[Result]] class, allowing downstream to be aware of
* whether or not the future's result was already seen.
*/
def async[A, B](parallelism: Int, ttl: FiniteDuration, capacity: Int, extractKey: A => String)(fn: A => Future[B]): Flow[A, Result[Try[B]], NotUsed] =
Flow.
fromMaterializer { (mat, _) =>
implicit val cacheConfig: CacheConfig = CacheConfig.defaultCacheConfig
implicit val ec: ExecutionContext = mat.executionContext
Flow[A]
.statefulMapConcat {
() =>
val cache = CacheFactory.createCaffeineCache[Future[B]](CacheSettings(name = "noop", ttl, capacity))
{ a =>
val key = extractKey(a)
val eventualResult = cache.get(key) match {
case Some(eventualValue) =>
eventualValue
.transformWith(Future.successful)
.map(Result(_, duplicate = true))
case None =>
val eventualValue = fn(a)
cache.put(key)(eventualValue)
eventualValue
.transformWith(Future.successful)
.map(Result(_, duplicate = false))
}
Iterator.single(eventualResult)
}
}
.mapAsync(parallelism)(identity)
}
.mapMaterializedValue(_ => NotUsed)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment