-
-
Save Daenyth/bcb0b4b8978dc9fa00735f374f5f2d92 to your computer and use it in GitHub Desktop.
| import cats.effect.{Concurrent, Timer} | |
| import fs2.{Chunk, Pipe, Stream} | |
| import scala.concurrent.duration.FiniteDuration | |
| package object fs2utils { | |
| /** | |
| * Grouping logic used to split a stream into sub-streams identified by a unique key. | |
| * Be aware that when working with an unbounded number of keys K, if streams never | |
| * terminate, there can potentially be unbounded memory usage. | |
| * | |
| * The emitted streams terminate when the input terminates or when the stream | |
| * returned by the pipe itself terminates. Termination is graceful and all input elements are emitted | |
| * | |
| * @param selector Function to retrieve grouping key from type A | |
| * @tparam A Elements in the stream | |
| * @tparam K A key used to group elements into substreams | |
| * @return Streams grouped by a unique key identifier `K`. | |
| */ | |
| def groupByUnbounded[F[_], A, K](selector: A => K)( | |
| implicit F: Concurrent[F] | |
| ): Pipe[F, A, (K, Stream[F, A])] = { in => | |
| Stream | |
| .resource(KeyedEnqueue.unbounded[F, K, A]) | |
| .flatMap { ke => | |
| in.through(KeyedEnqueue.pipe(ke)(selector)) | |
| } | |
| } | |
| /** Like `groupByUnbounded` but back pressures the stream when `maxItems` are inside */ | |
| def groupBy[F[_], A, K](maxItems: Long)(selector: A => K)( | |
| implicit F: Concurrent[F] | |
| ): Pipe[F, A, (K, Stream[F, A])] = { in => | |
| Stream | |
| .resource(KeyedEnqueue.itemBounded[F, K, A](maxItems)) | |
| .flatMap { ke => | |
| in.through(KeyedEnqueue.pipe(ke)(selector)) | |
| } | |
| } | |
| /** | |
| * Like `groupBy` but each substream is concurrently merged, emitting chunks when the substream has | |
| * `maxChunkSize` pending or when the substream has waited `maxChunkTimeout` without emitting elements, | |
| * similar to the standard `groupWithin` combinator | |
| * | |
| * @param maxTotalItems Backpressure when this many items are "in flight" concurrently | |
| * @param maxChunkSize Output chunks satisfy: 0 < emittedChunk.size <= maxChunkSize | |
| * @param maxChunkTimeout Emit chunks smaller than `maxChunkSize` if `maxChunkTimeout` time has elapsed without | |
| * emitting any chunks for a given key `K` and we have elements that match that selector waiting | |
| * @param selector Output elements satisfy: (key, chunk) => chunk.forall(a => selector(a) == key) | |
| */ | |
| def groupWithinBy[F[_], A, K]( | |
| maxTotalItems: Long, | |
| maxChunkSize: Int, | |
| maxChunkTimeout: FiniteDuration | |
| )( | |
| selector: A => K | |
| )(implicit F: Concurrent[F], timer: Timer[F]): Pipe[F, A, (K, Chunk[A])] = | |
| _.through(groupBy(maxTotalItems)(selector)).map { | |
| case (key, stream) => | |
| stream | |
| .groupWithin(maxChunkSize, maxChunkTimeout) | |
| .map(chunk => key -> chunk) | |
| }.parJoinUnbounded | |
| } |
| import cats.Monad | |
| import cats.effect.concurrent.{Ref, Semaphore} | |
| import cats.effect.{Concurrent, Resource} | |
| import cats.implicits._ | |
| import fs2.{Pipe, Stream} | |
| import fs2.concurrent.{NoneTerminatedQueue, Queue} | |
| /** Represents the ability to enqueue keyed items into a stream of queues that emits homogenous keyed streams. | |
| * | |
| * This allows construction of a "keyed fan-out" behavior for a stream, which may be used for | |
| * homogenously batching items that arrive via a heterogenous input | |
| * | |
| * Somewhat analogous to [[fs2.concurrent.Enqueue]] | |
| */ | |
| private[fs2utils] trait KeyedEnqueue[F[_], K, A] { | |
| /** Enqueue a single item for a given key, possibly creating and returning a new substream for that key | |
| * | |
| * @return <ul><li> None if the item was published to an already-live substream </li> | |
| * <li>Some if a new queue was created for this element. This can happen multiple times for the same | |
| * key (for example, if the implementation automatically terminates old/quiet substreams).</li></ul> | |
| * The returned stream may eventually terminate, but it won't be cancelled by this. | |
| */ | |
| def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] | |
| /** Gracefully terminate all sub-streams we have emitted so far */ | |
| def shutdownAll: F[Unit] | |
| } | |
| private[fs2utils] object KeyedEnqueue { | |
| def unbounded[F[_]: Concurrent, K, A]: Resource[F, KeyedEnqueue[F, K, A]] = | |
| Resource.liftF(Ref[F].of(Map.empty[K, NoneTerminatedQueue[F, A]])).flatMap { | |
| st => | |
| Resource.make( | |
| (new UnboundedKeyedEnqueue[F, K, A](st): KeyedEnqueue[F, K, A]) | |
| .pure[F])(_.shutdownAll) | |
| } | |
| def pipe[F[_]: Concurrent, K, A]( | |
| ke: KeyedEnqueue[F, K, A] | |
| )(selector: A => K): Pipe[F, A, (K, Stream[F, A])] = { in => | |
| // Note this *must* be `++` specifically to allow for "input termination = output termination" behavior. | |
| // Using `onFinalize` will allow the finalizer to be rescoped to the output of this stream later, which | |
| // results in it not triggering because it's waiting for itself to terminate before it terminates itself | |
| in.evalMap(a => ke.enqueue1(selector(a), a)).unNone ++ | |
| Stream.eval_(ke.shutdownAll) | |
| } | |
| def itemBounded[F[_]: Concurrent, K, A]( | |
| maxItems: Long | |
| ): Resource[F, KeyedEnqueue[F, K, A]] = | |
| for { | |
| ke <- unbounded[F, K, A] | |
| limit <- Resource.liftF(Semaphore[F](maxItems)) | |
| } yield new ItemBoundedKeyedEnqueue(ke, limit) | |
| } | |
| private class UnboundedKeyedEnqueue[F[_], K, A]( | |
| queues: Ref[F, Map[K, NoneTerminatedQueue[F, A]]] | |
| )(implicit F: Concurrent[F]) | |
| extends KeyedEnqueue[F, K, A] { | |
| override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
| withKey(key)(_.enqueue1(item.some)) | |
| override val shutdownAll: F[Unit] = | |
| queues.get.flatMap(_.values.toList.traverse_(_.enqueue1(None))) | |
| private[this] def withKey(key: K)( | |
| use: NoneTerminatedQueue[F, A] => F[Unit] | |
| ): F[Option[(K, Stream[F, A])]] = | |
| queues.get.flatMap { qm => | |
| qm.get(key) | |
| .fold { | |
| for { // No queue for key - create new one | |
| newQ <- Queue.noneTerminated[F, A] | |
| _ <- queues.update(x => x + (key -> newQ)) | |
| _ <- use(newQ) | |
| } yield (key -> newQ.dequeue).some | |
| }(q => use(q).as(None)) | |
| } | |
| } | |
| private class ItemBoundedKeyedEnqueue[F[_]: Monad, K, A]( | |
| ke: KeyedEnqueue[F, K, A], | |
| limit: Semaphore[F] | |
| ) extends KeyedEnqueue[F, K, A] { | |
| override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
| limit.acquire >> ke | |
| .enqueue1(key, item) | |
| .map(_.map { | |
| case (key, stream) => | |
| // We only need to attach the "release" behavior to a given stream once because each stream is emitted once, and then reused | |
| key -> stream.chunks | |
| .evalTap(c => limit.releaseN(c.size.toLong)) | |
| .flatMap(Stream.chunk) | |
| }) | |
| override val shutdownAll: F[Unit] = ke.shutdownAll | |
| } |
You don't need the full power of a Semaphore, but you do want to swap that get + update with a modify
Thanks!
To me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]:
queuesMap.get(key) match {
case Some(queue) =>
(queuesMap, use(queue).as(None))
case None =>
val action = for {
newQ <- Queue.noneTerminated[F, A]
updadted = queuesMap + (key -> newQ) // We cannot return it
result <- use(newQ).as((key -> newQ.dequeue).some)
} yield result
(???, action)
}Would be happy to be wrong.
Yeah, it's not immediately obvious how to avoid that - it may involve refactoring. Possible using something like the keyed ref from davenverse
I feel like I reasoned out that the race condition was in practice not a problem, but I need to re-examine with fresh eyes. I wrote this quite a while ago.
o me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]
you create the Queue in advance (which has negligible overhead), just like you do it when you need a Deferred. In any case I'm adding a similar combinator to the library directly
@SystemFw Hi, has such combinator already been added to fs2? Thanks :)
In any case I'm adding a similar combinator to the library directly
Have you done it? @SystemFw
No, I started on it at the time, and then got side-tracked in creating what has become Channel. I also felt unsatisfied by how large the possible api surface could be, but I will admit that's probably the perfect being the enemy of the good.
The method withKey does not seems thread safe to me. You are doing a get on a
Refand afterwards if the ref does not have the desired key in the map, you are doing aqueues.update. I think this method should be wrapped in acats.effect.concurrent.Semaforsemaphore.withPermit { // the content of withKey }