Created
December 8, 2015 20:06
-
-
Save pchiusano/2f60860447f2fbfa5979 to your computer and use it in GitHub Desktop.
Design of splitting combinators in FS2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fs2 | |
import util.Monad | |
import Step._ | |
import java.util.concurrent.atomic.AtomicLong | |
object diamond { | |
/** | |
* Pass elements of `s` through both `f` and `g`, then combine the two resulting streams. | |
* Implemented by enqueueing elements as they are seen by `f` onto a `Queue` used by the `g` branch. | |
* USE EXTREME CARE WHEN USING THIS FUNCTION. Deadlocks are possible if `combine` pulls from the `g` | |
* branch synchronously before the queue has been populated by the `f` branch. | |
* | |
* The `combine` function receives an `F[Long]` effect which evaluates to the current size of the | |
* `g`-branch's queue. | |
* | |
* When possible, use one of the safe combinators like `[[observe]]`, which are built using this function, | |
* in preference to using this function directly. | |
*/ | |
def diamond[F[_],A,B,C,D](s: Stream[F,A]) | |
(f: Stream[F,A] => Stream[F,B]) | |
(qs: QueueStrategy[Chunk[A]], g: Stream[F,A] => Stream[F,C]) // in theory could just have a Queue here, but don't want to allow sharing queues between diamonds | |
(combine: (Stream[F,B], F[Long], Stream[F,C]) => Stream[F,D])(implicit F: Async[F]): Stream[F,D] | |
= Stream.suspend { | |
val q = qs.toQueue | |
val queueSize = new AtomicLong(0L) | |
def suspendf[A](a: => A) = F.map(F.pure(())) { _ => a } | |
combine( | |
f( | |
s.repeatPull { h => h.receive { case a #: h => | |
Pull.eval(q.enqueue(Some(a))) >> | |
Pull.suspend { queueSize.incrementAndGet; Pull.pure(()) } >> | |
Pull.output(a).as(h) }} | |
.onComplete { Stream.eval_(q.enqueue(None)) } | |
), | |
suspendf { queueSize.get }, | |
g(toFiniteStream(q) flatMap { c => Stream.suspend { queueSize.decrementAndGet; Stream.chunk(c) }}) | |
) | |
} | |
trait Queue[F[_],A] { // placeholder for Pavel's real implementation | |
def enqueue(a: A): F[Unit] | |
def dequeue: F[A] | |
} | |
/** Convert a `Queue[F,Option[A]]` to a stream by treating `None` as indicating end-of-stream. */ | |
def toFiniteStream[F[_],A](q: Queue[F,Option[A]]): Stream[F,A] = Stream.eval(q.dequeue).flatMap { | |
case None => Stream.empty | |
case Some(a) => Stream.emit(a) ++ toFiniteStream(q) | |
} | |
trait QueueStrategy[A] { | |
// todo needs to be F[_]:AsyncExt | |
private[fs2] def toQueue[F[_]:Async]: Queue[F,Option[A]] | |
} | |
// just meant to be suggestive here | |
def bounded[A](maxSize: Long): QueueStrategy[A] = ??? | |
def unbounded[A]: QueueStrategy[A] = ??? | |
/** Evict the oldest element when the number queued exceeds `maxSize`. */ | |
def evictOldest[A](maxSize: Long): QueueStrategy[A] = ??? | |
/** Evict the newest element when the number queued exceeds `maxSize`. */ | |
def evictNewest[A](maxSize: Long): QueueStrategy[A] = ??? | |
def observe[F[_]:Async,A](s: Stream[F,A])(sink: Sink[F,A]): Stream[F,A] = | |
diamond(s)(identity)(bounded(1), sink) { (a,n,sinkResponses) => | |
(a repeatPull2 sinkResponses) { (h1, hq) => | |
Pull.eval(n) flatMap { numberQueued => | |
if (numberQueued >= 1) hq.receive { case _ #: hq => Pull.pure((h1,hq)) } | |
else h1.receive { case a #: h1 => Pull.output(a).as((h1,hq)) } | |
} | |
} | |
} | |
def observeAsync[F[_]:Async,A](s: Stream[F,A])(sink: Sink[F,A], maxQueued: Long): Stream[F,A] = | |
diamond(s)(identity)(bounded(maxQueued), sink andThen (_.drain)) { | |
(a,_,sinkResponses) => wye.merge(a,sinkResponses) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment