Created
October 9, 2019 14:22
-
-
Save Daenyth/bcb0b4b8978dc9fa00735f374f5f2d92 to your computer and use it in GitHub Desktop.
fs2 groupBy / KeyedEnqueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{Concurrent, Timer} | |
import fs2.{Chunk, Pipe, Stream} | |
import scala.concurrent.duration.FiniteDuration | |
package object fs2utils { | |
/** | |
* Grouping logic used to split a stream into sub-streams identified by a unique key. | |
* Be aware that when working with an unbounded number of keys K, if streams never | |
* terminate, there can potentially be unbounded memory usage. | |
* | |
* The emitted streams terminate when the input terminates or when the stream | |
* returned by the pipe itself terminates. Termination is graceful and all input elements are emitted | |
* | |
* @param selector Function to retrieve grouping key from type A | |
* @tparam A Elements in the stream | |
* @tparam K A key used to group elements into substreams | |
* @return Streams grouped by a unique key identifier `K`. | |
*/ | |
def groupByUnbounded[F[_], A, K](selector: A => K)( | |
implicit F: Concurrent[F] | |
): Pipe[F, A, (K, Stream[F, A])] = { in => | |
Stream | |
.resource(KeyedEnqueue.unbounded[F, K, A]) | |
.flatMap { ke => | |
in.through(KeyedEnqueue.pipe(ke)(selector)) | |
} | |
} | |
/** Like `groupByUnbounded` but back pressures the stream when `maxItems` are inside */ | |
def groupBy[F[_], A, K](maxItems: Long)(selector: A => K)( | |
implicit F: Concurrent[F] | |
): Pipe[F, A, (K, Stream[F, A])] = { in => | |
Stream | |
.resource(KeyedEnqueue.itemBounded[F, K, A](maxItems)) | |
.flatMap { ke => | |
in.through(KeyedEnqueue.pipe(ke)(selector)) | |
} | |
} | |
/** | |
* Like `groupBy` but each substream is concurrently merged, emitting chunks when the substream has | |
* `maxChunkSize` pending or when the substream has waited `maxChunkTimeout` without emitting elements, | |
* similar to the standard `groupWithin` combinator | |
* | |
* @param maxTotalItems Backpressure when this many items are "in flight" concurrently | |
* @param maxChunkSize Output chunks satisfy: 0 < emittedChunk.size <= maxChunkSize | |
* @param maxChunkTimeout Emit chunks smaller than `maxChunkSize` if `maxChunkTimeout` time has elapsed without | |
* emitting any chunks for a given key `K` and we have elements that match that selector waiting | |
* @param selector Output elements satisfy: (key, chunk) => chunk.forall(a => selector(a) == key) | |
*/ | |
def groupWithinBy[F[_], A, K]( | |
maxTotalItems: Long, | |
maxChunkSize: Int, | |
maxChunkTimeout: FiniteDuration | |
)( | |
selector: A => K | |
)(implicit F: Concurrent[F], timer: Timer[F]): Pipe[F, A, (K, Chunk[A])] = | |
_.through(groupBy(maxTotalItems)(selector)).map { | |
case (key, stream) => | |
stream | |
.groupWithin(maxChunkSize, maxChunkTimeout) | |
.map(chunk => key -> chunk) | |
}.parJoinUnbounded | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.Monad | |
import cats.effect.concurrent.{Ref, Semaphore} | |
import cats.effect.{Concurrent, Resource} | |
import cats.implicits._ | |
import fs2.{Pipe, Stream} | |
import fs2.concurrent.{NoneTerminatedQueue, Queue} | |
/** Represents the ability to enqueue keyed items into a stream of queues that emits homogenous keyed streams. | |
* | |
* This allows construction of a "keyed fan-out" behavior for a stream, which may be used for | |
* homogenously batching items that arrive via a heterogenous input | |
* | |
* Somewhat analogous to [[fs2.concurrent.Enqueue]] | |
*/ | |
private[fs2utils] trait KeyedEnqueue[F[_], K, A] { | |
/** Enqueue a single item for a given key, possibly creating and returning a new substream for that key | |
* | |
* @return <ul><li> None if the item was published to an already-live substream </li> | |
* <li>Some if a new queue was created for this element. This can happen multiple times for the same | |
* key (for example, if the implementation automatically terminates old/quiet substreams).</li></ul> | |
* The returned stream may eventually terminate, but it won't be cancelled by this. | |
*/ | |
def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] | |
/** Gracefully terminate all sub-streams we have emitted so far */ | |
def shutdownAll: F[Unit] | |
} | |
private[fs2utils] object KeyedEnqueue { | |
def unbounded[F[_]: Concurrent, K, A]: Resource[F, KeyedEnqueue[F, K, A]] = | |
Resource.liftF(Ref[F].of(Map.empty[K, NoneTerminatedQueue[F, A]])).flatMap { | |
st => | |
Resource.make( | |
(new UnboundedKeyedEnqueue[F, K, A](st): KeyedEnqueue[F, K, A]) | |
.pure[F])(_.shutdownAll) | |
} | |
def pipe[F[_]: Concurrent, K, A]( | |
ke: KeyedEnqueue[F, K, A] | |
)(selector: A => K): Pipe[F, A, (K, Stream[F, A])] = { in => | |
// Note this *must* be `++` specifically to allow for "input termination = output termination" behavior. | |
// Using `onFinalize` will allow the finalizer to be rescoped to the output of this stream later, which | |
// results in it not triggering because it's waiting for itself to terminate before it terminates itself | |
in.evalMap(a => ke.enqueue1(selector(a), a)).unNone ++ | |
Stream.eval_(ke.shutdownAll) | |
} | |
def itemBounded[F[_]: Concurrent, K, A]( | |
maxItems: Long | |
): Resource[F, KeyedEnqueue[F, K, A]] = | |
for { | |
ke <- unbounded[F, K, A] | |
limit <- Resource.liftF(Semaphore[F](maxItems)) | |
} yield new ItemBoundedKeyedEnqueue(ke, limit) | |
} | |
private class UnboundedKeyedEnqueue[F[_], K, A]( | |
queues: Ref[F, Map[K, NoneTerminatedQueue[F, A]]] | |
)(implicit F: Concurrent[F]) | |
extends KeyedEnqueue[F, K, A] { | |
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
withKey(key)(_.enqueue1(item.some)) | |
override val shutdownAll: F[Unit] = | |
queues.get.flatMap(_.values.toList.traverse_(_.enqueue1(None))) | |
private[this] def withKey(key: K)( | |
use: NoneTerminatedQueue[F, A] => F[Unit] | |
): F[Option[(K, Stream[F, A])]] = | |
queues.get.flatMap { qm => | |
qm.get(key) | |
.fold { | |
for { // No queue for key - create new one | |
newQ <- Queue.noneTerminated[F, A] | |
_ <- queues.update(x => x + (key -> newQ)) | |
_ <- use(newQ) | |
} yield (key -> newQ.dequeue).some | |
}(q => use(q).as(None)) | |
} | |
} | |
private class ItemBoundedKeyedEnqueue[F[_]: Monad, K, A]( | |
ke: KeyedEnqueue[F, K, A], | |
limit: Semaphore[F] | |
) extends KeyedEnqueue[F, K, A] { | |
override def enqueue1(key: K, item: A): F[Option[(K, Stream[F, A])]] = | |
limit.acquire >> ke | |
.enqueue1(key, item) | |
.map(_.map { | |
case (key, stream) => | |
// We only need to attach the "release" behavior to a given stream once because each stream is emitted once, and then reused | |
key -> stream.chunks | |
.evalTap(c => limit.releaseN(c.size.toLong)) | |
.flatMap(Stream.chunk) | |
}) | |
override val shutdownAll: F[Unit] = ke.shutdownAll | |
} |
No, I started on it at the time, and then got side-tracked in creating what has become Channel
. I also felt unsatisfied by how large the possible api surface could be, but I will admit that's probably the perfect being the enemy of the good.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Have you done it? @SystemFw