Created
July 12, 2023 19:40
-
-
Save longshorej/7b6cd36097885664b8180749c04bfd5d to your computer and use it in GitHub Desktop.
Akka Streams Async Dedup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dedup | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.concurrent.duration._ | |
import scala.util.Try | |
import akka.NotUsed | |
import akka.stream.scaladsl.Flow | |
import scalacache.CacheConfig | |
import scalacache.modes.sync._ | |
/** Utilities for stream deduplication */ | |
object DedupFlows { | |
/** Wrapper class, indicating whether the value produced was the result of a duplicate upstream element */ | |
final case class Result[A](value: A, duplicate: Boolean) | |
/** Provides a [[Flow]] that dispatches futures for incoming elements. If the incoming | |
* element was already seen (according to its extracted key), the previously dispatched future | |
* is emitted. | |
* | |
* Emitted values are wrapped in a [[Result]] class, allowing downstream to be aware of | |
* whether or not the future's result was already seen. | |
*/ | |
def async[A, B](parallelism: Int, ttl: FiniteDuration, capacity: Int, extractKey: A => String)(fn: A => Future[B]): Flow[A, Result[Try[B]], NotUsed] = | |
Flow. | |
fromMaterializer { (mat, _) => | |
implicit val cacheConfig: CacheConfig = CacheConfig.defaultCacheConfig | |
implicit val ec: ExecutionContext = mat.executionContext | |
Flow[A] | |
.statefulMapConcat { | |
() => | |
val cache = CacheFactory.createCaffeineCache[Future[B]](CacheSettings(name = "noop", ttl, capacity)) | |
{ a => | |
val key = extractKey(a) | |
val eventualResult = cache.get(key) match { | |
case Some(eventualValue) => | |
eventualValue | |
.transformWith(Future.successful) | |
.map(Result(_, duplicate = true)) | |
case None => | |
val eventualValue = fn(a) | |
cache.put(key)(eventualValue) | |
eventualValue | |
.transformWith(Future.successful) | |
.map(Result(_, duplicate = false)) | |
} | |
Iterator.single(eventualResult) | |
} | |
} | |
.mapAsync(parallelism)(identity) | |
} | |
.mapMaterializedValue(_ => NotUsed) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment