Last active
April 22, 2017 04:57
-
-
Save pchlupacek/989a2801036a9441da252726a1b4972d to your computer and use it in GitHub Desktop.
converting fs2 to scalaz.stream and vice versa
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spinoco.scalaz.stream | |
import fs2.Handle | |
import fs2._ | |
import scalaz.concurrent.{Actor, Task} | |
import scalaz.stream.{Cause, Process, wye} | |
import scala.language.higherKinds | |
import scalaz.{-\/, \/, \/-} | |
import scalaz.\/._ | |
object fs2Conversion { | |
implicit val F = fs2.Task.asyncInstance(fs2.Strategy.sequential) | |
def fs2ToProcess[A](fs2in:Stream[fs2.Task,A]):Process[Task,A] = Process.suspend { | |
import impl._ | |
val actor = syncActor[AttemptFS2,AttemptScalaz, A] | |
def stepper : Pipe[fs2.Task,A,Nothing] = { | |
// for the performance reasons we do this manually instad of chunk & evalMap | |
def go:Handle[fs2.Task,A] => fs2.Pull[fs2.Task,Nothing, Unit] = { | |
_.receiveOption { | |
case None => Pull.done | |
case Some((chunk,h)) => | |
Pull.eval(fs2.Task.unforkedAsync[Boolean] { cb => | |
actor ! OfferChunk[AttemptFS2,A](chunk, cb) | |
}).flatMap { continue => | |
if (continue) go(h) | |
else Pull.done | |
} | |
} | |
} | |
_.pull(go) | |
} | |
(fs2.Stream.eval(fs2.Task.unforkedAsync[Boolean]{ cb => actor ! RegisterPublisher[AttemptFS2](cb) }) ++ | |
fs2in.through(stepper)) | |
.run.unsafeRunAsync { | |
case Right(_) => actor ! PublisherDone(None) | |
case Left(rsn) => actor ! PublisherDone(Some(rsn)) | |
} | |
//construct Process from evaluating the actor | |
def go:Process[Task,A] = { | |
Process.eval(Task.async[Option[Chunk[A]]]{ cb => | |
actor ! RequestChunk[AttemptScalaz,A](cb) | |
}).flatMap { | |
case None => Process.halt | |
case Some(chunk) => Process.emitAll(chunk.toList) ++ go | |
} | |
} | |
go | |
.onHalt { | |
case Cause.End | Cause.Kill => (Process.eval_(Task.delay(actor ! SubscriberDone(None))) ++ Process.halt).asFinalizer | |
case Cause.Error(rsn) => (Process.eval_(Task.delay(actor ! SubscriberDone(Some(rsn)))) ++ Process.fail(rsn)).asFinalizer | |
} | |
} | |
def processToFs2[A](in:Process[Task,A]):Stream[fs2.Task,A] = Stream.suspend { | |
import impl._ | |
val actor = syncActor[AttemptScalaz, AttemptFS2, A] | |
(Process.eval(Task.async[Boolean]{ cb => actor ! RegisterPublisher[AttemptScalaz](cb) }) ++ | |
in.evalMap { a => Task.async[Boolean]{ cb => actor ! OfferChunk[AttemptScalaz,A](Chunk.singleton(a), cb) } } | |
.takeWhile(identity)) | |
.run.runAsync { | |
case \/-(_) => actor ! PublisherDone(None) | |
case -\/(rsn) => actor ! PublisherDone(Some(rsn)) | |
} | |
// stream from an actor | |
def go:fs2.Stream[fs2.Task,A] = { | |
Stream.eval(fs2.Task.unforkedAsync[Option[Chunk[A]]]{ cb => | |
actor ! RequestChunk[AttemptFS2,A](cb) | |
}).flatMap { | |
case None => Stream.empty // done | |
case Some(chunk) => Stream.chunk(chunk) ++ go | |
} | |
} | |
go | |
.onError { rsn => | |
Stream.eval_(fs2.Task.delay { actor ! SubscriberDone(Some(rsn)) }) ++ Stream.fail(rsn) | |
} | |
.onFinalize { | |
fs2.Task.delay(actor ! SubscriberDone(None)) | |
} | |
} | |
def fs2Task2Task[A](in:fs2.Task[A]):Task[A] = | |
Task.async[A] { cb => in.unsafeRunAsync { r => cb(\/.fromEither(r)) } } | |
def task2fs2Task[A](in:Task[A]):fs2.Task[A] = | |
fs2.Task.async[A]({ cb => in.runAsync { r => cb(r.toEither) } })(fs2.Strategy.sequential) | |
private object impl { | |
type AttemptFS2[+A] = Either[Throwable,A] | |
type AttemptScalaz[+A] = Throwable \/ A | |
sealed trait Attempt[F[+_]] { | |
def success[A](a:A):F[A] | |
val continue:F[Boolean] = success(true) | |
val stop:F[Boolean] = success(false) | |
def failure(rsn:Throwable):F[Nothing] | |
def fromResult(result:Option[Throwable]):F[Boolean] = result.map(failure).getOrElse(stop) | |
} | |
object Attempt { | |
implicit val fs2Instance:Attempt[AttemptFS2] = new Attempt[AttemptFS2] { | |
def success[A](a: A): AttemptFS2[A] = Right(a) | |
def failure(rsn: Throwable): AttemptFS2[Nothing] = Left(rsn) | |
} | |
implicit val processInstance: Attempt[AttemptScalaz]= new Attempt[AttemptScalaz] { | |
def success[A](a: A): AttemptScalaz[A] = \/-(a) | |
def failure(rsn: Throwable): AttemptScalaz[Nothing] = -\/(rsn) | |
} | |
} | |
case class RegisterPublisher[F[+_]](cb: F[Boolean] => Unit) extends M[F,Nothing,Nothing] | |
case class RequestChunk[F[+_], A](cb: F[Option[Chunk[A]]] => Unit) extends M[Nothing,F,A] | |
case class OfferChunk[F[+_], A](chunk:Chunk[A], cb: F[Boolean] => Unit) extends M[F,Nothing,A] | |
case class SubscriberDone(result:Option[Throwable]) extends M[Nothing,Nothing,Nothing] | |
case class PublisherDone(result:Option[Throwable]) extends M[Nothing,Nothing,Nothing] | |
sealed trait M[+FIn[+_], +FOut[+_] ,+A] | |
def syncActor[FIn[+_], FOut[+_], A](implicit FIn:Attempt[FIn], FOut:Attempt[FOut]): Actor[M[FIn,FOut,A]] = { | |
var upReady:Option[FIn[Boolean] => Unit] = None | |
var requestReady:Option[FOut[Option[Chunk[A]]] => Unit] = None | |
var done:Option[Option[Throwable]] = None | |
Actor.actor[M[FIn, FOut, A]]({ | |
case register:RegisterPublisher[FIn @ unchecked] => | |
if (requestReady.nonEmpty) register.cb(FIn.continue) | |
else upReady = Some(register.cb) | |
case request:RequestChunk[FOut @unchecked,A @unchecked] => | |
done match { | |
case None => requestReady = Some(request.cb) ; upReady.foreach { cb => cb(FIn.continue) } ; upReady = None | |
case Some(None) => request.cb(FOut.success(None)) | |
case Some(Some(rsn)) => request.cb(FOut.failure(rsn)) | |
} | |
case offer:OfferChunk[FIn @unchecked ,A @unchecked] => | |
done match { | |
case None => requestReady match { | |
case Some(rcb) => rcb(FOut.success(Some(offer.chunk))) ; upReady = Some(offer.cb) ; requestReady = None | |
case None => | |
val rsn = new Throwable(s"Downstream not ready, while requesting data : ${offer.chunk}") | |
done = Some(Some(rsn)) | |
offer.cb(FIn.failure(rsn)) | |
} | |
case Some(result) => offer.cb(FIn.fromResult(result)) | |
} | |
case SubscriberDone(result) => | |
upReady.foreach { cb => cb(FIn.fromResult(result)) } | |
upReady = None | |
done = done orElse Some(result) | |
case PublisherDone(result) => | |
requestReady.foreach { cb => result match { | |
case None => cb(FOut.success(None)) | |
case Some(rsn) => cb(FOut.failure(rsn)) | |
}} | |
requestReady = None | |
done = done orElse Some(result) | |
})(scalaz.concurrent.Strategy.Sequential) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I receive a StackOverflow when using the task2fs2Task too often (i.e. .repeat).