-
-
Save Daenyth/bcb0b4b8978dc9fa00735f374f5f2d92 to your computer and use it in GitHub Desktop.
import cats.effect.{Concurrent, Timer} | |
import fs2.{Chunk, Pipe, Stream} | |
import scala.concurrent.duration.FiniteDuration | |
package object fs2utils { | |
/** | |
* Grouping logic used to split a stream into sub-streams identified by a unique key. | |
* Be aware that when working with an unbounded number of keys K, if streams never | |
* terminate, there can potentially be unbounded memory usage. | |
* | |
* The emitted streams terminate when the input terminates or when the stream | |
* returned by the pipe itself terminates. Termination is graceful and all input elements are emitted | |
* | |
* @param selector Function to retrieve grouping key from type A | |
* @tparam A Elements in the stream | |
* @tparam K A key used to group elements into substreams | |
* @return Streams grouped by a unique key identifier `K`. | |
*/ | |
def groupByUnbounded[F[_], A, K](selector: A => K)( | |
implicit F: Concurrent[F] | |
): Pipe[F, A, (K, Stream[F, A])] = { in => | |
Stream | |
.resource(KeyedEnqueue.unbounded[F, K, A]) | |
.flatMap { ke => | |
in.through(KeyedEnqueue.pipe(ke)(selector)) | |
} | |
} | |
/** Like `groupByUnbounded` but back pressures the stream when `maxItems` are inside */ | |
def groupBy[F[_], A, K](maxItems: Long)(selector: A => K)( | |
implicit F: Concurrent[F] | |
): Pipe[F, A, (K, Stream[F, A])] = { in => | |
Stream | |
.resource(KeyedEnqueue.itemBounded[F, K, A](maxItems)) | |
.flatMap { ke => | |
in.through(KeyedEnqueue.pipe(ke)(selector)) | |
} | |
} | |
/** | |
* Like `groupBy` but each substream is concurrently merged, emitting chunks when the substream has | |
* `maxChunkSize` pending or when the substream has waited `maxChunkTimeout` without emitting elements, | |
* similar to the standard `groupWithin` combinator | |
* | |
* @param maxTotalItems Backpressure when this many items are "in flight" concurrently | |
* @param maxChunkSize Output chunks satisfy: 0 < emittedChunk.size <= maxChunkSize | |
* @param maxChunkTimeout Emit chunks smaller than `maxChunkSize` if `maxChunkTimeout` time has elapsed without | |
* emitting any chunks for a given key `K` and we have elements that match that selector waiting | |
* @param selector Output elements satisfy: (key, chunk) => chunk.forall(a => selector(a) == key) | |
*/ | |
def groupWithinBy[F[_], A, K]( | |
maxTotalItems: Long, | |
maxChunkSize: Int, | |
maxChunkTimeout: FiniteDuration | |
)( | |
selector: A => K | |
)(implicit F: Concurrent[F], timer: Timer[F]): Pipe[F, A, (K, Chunk[A])] = | |
_.through(groupBy(maxTotalItems)(selector)).map { | |
case (key, stream) => | |
stream | |
.groupWithin(maxChunkSize, maxChunkTimeout) | |
.map(chunk => key -> chunk) | |
}.parJoinUnbounded | |
} |
import cats.Monad | |
import cats.effect.concurrent.{Ref, Semaphore} | |
import cats.effect.{Concurrent, Resource} | |
import cats.implicits._ | |
import fs2.{Pipe, Stream} | |
import fs2.concurrent.{NoneTerminatedQueue, Queue} | |
/** Represents the ability to enqueue keyed items into a stream of queues that emits homogenous keyed streams. | |
* | |
* This allows construction of a "keyed fan-out" behavior for a stream, which may be used for | |
* homogenously batching items that arrive via a heterogenous input | |
* | |
* Somewhat analogous to [[fs2.concurrent.Enqueue]] | |
*/ | |
private[fs2utils] trait KeyedEnqueue[F[_], K, A] { | |
/** Enqueue a single item for a given key, possibly creating and returning a new substream for that key | |
* | |
* @return <ul><li> None if the item was published to an already-live substream </li> | |
* <li>Some if a new queue was created for this element. This can happen multiple times for the same | |
* key (for example, if the implementation automatically terminates old/quiet substreams).</li></ul> | |
* The returned stream may eventually terminate, but it won't be cancelled by this. | |
*/ | |
def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] | |
/** Gracefully terminate all sub-streams we have emitted so far */ | |
def shutdownAll: F[Unit] | |
} | |
private[fs2utils] object KeyedEnqueue { | |
def unbounded[F[_]: Concurrent, K, A]: Resource[F, KeyedEnqueue[F, K, A]] = | |
Resource.liftF(Ref[F].of(Map.empty[K, NoneTerminatedQueue[F, A]])).flatMap { | |
st => | |
Resource.make( | |
(new UnboundedKeyedEnqueue[F, K, A](st): KeyedEnqueue[F, K, A]) | |
.pure[F])(_.shutdownAll) | |
} | |
def pipe[F[_]: Concurrent, K, A]( | |
ke: KeyedEnqueue[F, K, A] | |
)(selector: A => K): Pipe[F, A, (K, Stream[F, A])] = { in => | |
// Note this *must* be `++` specifically to allow for "input termination = output termination" behavior. | |
// Using `onFinalize` will allow the finalizer to be rescoped to the output of this stream later, which | |
// results in it not triggering because it's waiting for itself to terminate before it terminates itself | |
in.evalMap(a => ke.enqueue1(selector(a), a)).unNone ++ | |
Stream.eval_(ke.shutdownAll) | |
} | |
def itemBounded[F[_]: Concurrent, K, A]( | |
maxItems: Long | |
): Resource[F, KeyedEnqueue[F, K, A]] = | |
for { | |
ke <- unbounded[F, K, A] | |
limit <- Resource.liftF(Semaphore[F](maxItems)) | |
} yield new ItemBoundedKeyedEnqueue(ke, limit) | |
} | |
private class UnboundedKeyedEnqueue[F[_], K, A]( | |
queues: Ref[F, Map[K, NoneTerminatedQueue[F, A]]] | |
)(implicit F: Concurrent[F]) | |
extends KeyedEnqueue[F, K, A] { | |
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
withKey(key)(_.enqueue1(item.some)) | |
override val shutdownAll: F[Unit] = | |
queues.get.flatMap(_.values.toList.traverse_(_.enqueue1(None))) | |
private[this] def withKey(key: K)( | |
use: NoneTerminatedQueue[F, A] => F[Unit] | |
): F[Option[(K, Stream[F, A])]] = | |
queues.get.flatMap { qm => | |
qm.get(key) | |
.fold { | |
for { // No queue for key - create new one | |
newQ <- Queue.noneTerminated[F, A] | |
_ <- queues.update(x => x + (key -> newQ)) | |
_ <- use(newQ) | |
} yield (key -> newQ.dequeue).some | |
}(q => use(q).as(None)) | |
} | |
} | |
private class ItemBoundedKeyedEnqueue[F[_]: Monad, K, A]( | |
ke: KeyedEnqueue[F, K, A], | |
limit: Semaphore[F] | |
) extends KeyedEnqueue[F, K, A] { | |
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
limit.acquire >> ke | |
.enqueue1(key, item) | |
.map(_.map { | |
case (key, stream) => | |
// We only need to attach the "release" behavior to a given stream once because each stream is emitted once, and then reused | |
key -> stream.chunks | |
.evalTap(c => limit.releaseN(c.size.toLong)) | |
.flatMap(Stream.chunk) | |
}) | |
override val shutdownAll: F[Unit] = ke.shutdownAll | |
} |
Thanks!
To me it looks like modify
is not possible here as updated Map
depends on effectful Queue.noneTerminated[F, A]
:
queuesMap.get(key) match {
case Some(queue) =>
(queuesMap, use(queue).as(None))
case None =>
val action = for {
newQ <- Queue.noneTerminated[F, A]
updadted = queuesMap + (key -> newQ) // We cannot return it
result <- use(newQ).as((key -> newQ.dequeue).some)
} yield result
(???, action)
}
Would be happy to be wrong.
Yeah, it's not immediately obvious how to avoid that - it may involve refactoring. Possible using something like the keyed ref from davenverse
I feel like I reasoned out that the race condition was in practice not a problem, but I need to re-examine with fresh eyes. I wrote this quite a while ago.
o me it looks like modify is not possible here as updated Map depends on effectful Queue.noneTerminated[F, A]
you create the Queue
in advance (which has negligible overhead), just like you do it when you need a Deferred. In any case I'm adding a similar combinator to the library directly
@SystemFw Hi, has such combinator already been added to fs2? Thanks :)
In any case I'm adding a similar combinator to the library directly
Have you done it? @SystemFw
No, I started on it at the time, and then got side-tracked in creating what has become Channel
. I also felt unsatisfied by how large the possible api surface could be, but I will admit that's probably the perfect being the enemy of the good.
You don't need the full power of a Semaphore, but you do want to swap that
get
+update
with amodify