Last active
November 2, 2016 13:59
-
-
Save djspiewak/6b5cd3fb78b054046755 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait AcceptT[+F[+_], -I, +O] | |
object AcceptT { | |
final case class Effect[+F[+_], +O](f: F[O]) extends AcceptT[F, Any, O] | |
final case class Accept[+F[+_], I]() extends AcceptT[F, I, I] | |
} | |
type Transducer[+F[+_], -I, +O] = Process[({ type λ[+α] = AcceptT[F, I, α] })#λ, O] | |
// type Process1[I, O] = Transducer[Identity, I, O] | |
// type Channel = Obsolete | |
// type Sink[F[+_], A] = Transducer[F, A, Unit] | |
object transducer { | |
def receiveF[F[+_], A, I, O](effect: F[A])(f: A => Transducer[F, I, O]): Transducer[F, I, O] = | |
await[({ type λ[+α] = AcceptT[F, I, α] })#λ, A, O](AcceptT.Effect(effect))(f) | |
def receive1[F[+_], I, O](f: I => Transducer[F, I, O]): Transducer[F, I, O] = | |
await[({ type λ[+α] = AcceptT[F, I, α] })#λ, I, O](AcceptT.Accept[F, I]())(f) | |
} | |
implicit class TransducerSyntax[F[+_], A](val self: Process[F, A]) extends AnyVal { | |
import Cause.{EarlyCause, End, Kill} | |
def transduce[B](trans: Transducer[F, A, B])(implicit F: Applicative[F], C: Catchable[F]): Process[F, B] = trans.suspendStep flatMap { | |
// fail suck ugliness to get around limitations in GADT pattern matching | |
case Halt(rsn) => self.kill onHalt { _ => Halt(rsn) } | |
case step: Step[({ type λ[+α] = AcceptT[F, A, α] })#λ, B] => { | |
val cont1 = step.next | |
step.head match { | |
case awt: Await[({ type λ[+α] = AcceptT[F, A, α] })#λ, a, B] => { | |
val req = awt.req | |
val rcv = awt.rcv | |
req match { | |
case AcceptT.Effect(eff) => { | |
val fp = eff.attempt map { _ leftMap { Error(_) } } map { res => | |
self transduce (rcv(res).run +: cont1) // TODO errors? I don't have access to Util.Try, because... blergh | |
} | |
eval(fp) flatMap { a => a } | |
} | |
case AcceptT.Accept() => self.step match { | |
case Step(awt @ Await(_, _), cont) => awt extend { p => new TransducerSyntax[F, A](p +: cont) transduce step.toProcess } | |
case Step(Emit(Seq()), cont) => cont.continue transduce step.toProcess | |
case Step(Emit(is), cont) => { | |
val nextT = rcv(\/-(is.head.asInstanceOf[a])).run +: cont1 // this is sound because Accept proves it so | |
val nextS = emitAll(is.tail) +: cont | |
new TransducerSyntax[F, A](nextS) transduce nextT | |
} | |
case hlt @ Halt(End) => new TransducerSyntax[F, A](hlt) transduce (step.toProcess disconnect Kill swallowKill) | |
case hlt @ Halt(rsn: Cause.EarlyCause) => new TransducerSyntax[F, A](hlt) transduce (step.toProcess disconnect rsn) | |
} | |
} | |
} | |
case emt @ Emit(os) => { | |
emt onHalt { | |
case End => self transduce cont1.continue | |
case early => self transduce (Halt(early) +: cont1) causedBy early | |
} | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object TransducerSpecs extends Specification with ScalaCheck { | |
import Process._ | |
import StreamUtils._ | |
"effectful stream transducers" should { | |
def id[I]: Transducer[Nothing, I, I] = | |
transducer.receive1[Nothing, I, I](emit).repeat | |
"perform a simple identity transformation" in prop { xs: List[List[Int]] => | |
val p = emitAll(xs map emitAll).toSource.join | |
(p transduce id).runLog.run mustEqual xs.flatten | |
} | |
"perform an arbitrary transformation" in prop { (xs: List[List[Int]], f: Int => Int) => | |
val t = transducer receive1 { i: Int => emit(f(i)) } repeat | |
val p = emitAll(xs map emitAll).toSource.join | |
(p transduce t).runLog.run mustEqual (xs.flatten map f) | |
} | |
"control the state machine based on an effect" in { | |
def subT1(i: Int) = emit(i * 2) | |
def subT2(i: Int) = emit(i) | |
val t = transducer receive1 { i: Int => | |
transducer.receiveF(Task now true) { b => | |
if (b) subT1(i) else subT2(i) | |
} | |
} repeat | |
(Process(1, 2, 3) transduce t).runLog.run mustEqual Vector(2, 4, 6) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment