Skip to content

Instantly share code, notes, and snippets.

@dsebban
Last active August 20, 2018 09:17
Show Gist options
  • Save dsebban/d4ca2d6a557dfc749fc554247ba45385 to your computer and use it in GitHub Desktop.
Save dsebban/d4ca2d6a557dfc749fc554247ba45385 to your computer and use it in GitHub Desktop.
transducer
object Transducer {
sealed trait Process[I, O]
case class Halt[I,O]() extends Process[I,O]
case class Emit[I,O](h: O, tail: Process[I, O]) extends Process[I,O]
case class Await[I,O](recv: Option[I] => Process[I,O]) extends Process[I,O]
}
import Transducer._
object ProcessOps {
def liftOne[I,O](f: I => O): Process[I,O] = Await {
case None => Halt()
case Some(o) => Emit(f(o), Halt())
}
def repeat[I,O](in: Process[I,O]): Process[I,O] = {
def go(p: Process[I,O]): Process[I,O] = p match {
case Emit(h,t) => Emit(h, go(t))
case Await(recv) =>
Await { case None => recv(None)
case i => go(recv(i))
}
case Halt() => go(in)
}
go(in)
}
def lift[I,O](f: I => O): Process[I,O] = repeat(liftOne(f))
def filter[I](f: I => Boolean): Process[I,I] =
Await[I,I] {
case Some(i) if(f(i)) => Emit(i, Halt())
case _ => Halt()
}
def filterAll[I](f: I => Boolean): Process[I,I] = repeat(filter(f))
}
import ProcessOps._
def streamTransducer[I,O](s: Stream[I], p: Process[I,O]) : Stream[O] = p match {
case Halt() => Stream()
case Emit(h,t) => Stream(h) ++ streamTransducer(s, t)
case Await(recv) => s match {
case x #:: t => streamTransducer(t,recv(Some(x)))
case xs => streamTransducer(xs,recv(None))
}
}
streamTransducer(Stream(1,2,3), filterAll( (i:Int) => i % 2 != 0 ))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment