Skip to content

Instantly share code, notes, and snippets.

@apskii
Created October 13, 2014 20:16
Show Gist options
  • Save apskii/0d515a538fb802cb4cbe to your computer and use it in GitHub Desktop.
Save apskii/0d515a538fb802cb4cbe to your computer and use it in GitHub Desktop.
import scala.collection.immutable
sealed trait SP[-A, +B] {
def run(input: Stream[A]): Stream[B] = this match {
case Put(value, sp) => value #:: sp.run(input)
case Get(builder) => input match {
case x #:: xs => builder(x).run(xs)
case _ => Stream.empty
}
}
}
case class Get[A, B](builder: A => SP[A, B]) extends SP[A, B]
case class Put[A, B](value: B, sp: SP[A, B]) extends SP[A, B]
object SP {
trait Category extends scalaz.Category[SP] {
def id[A]: SP[A, A] = Get(x => Put(x, id))
def compose[A, B, C](f: SP[B, C], g: SP[A, B]): SP[A, C] = (g, f) match {
case (_, Put(v, sp)) => Put(v, compose(sp, g))
case (Get(b), _ ) => Get(x => compose(f, b(x)))
case (Put(v, sp), Get(b) ) => compose(b(v), sp)
}
}
trait Arrow extends scalaz.Arrow[SP] with Category {
def arr[A, B](f: (A) => B): SP[A, B] = Get(a => Put(f(a), arr(f)))
def first[A, B, C](f: SP[A, B]): SP[(A, C), (B, C)] = delay(f, immutable.Queue.empty)
private def delay[A, B, C](sp: SP[A, B], q: immutable.Queue[C]): SP[(A, C), (B, C)] = sp match {
case Get(b) => Get { case (a, c) => delay(b(a), q.enqueue(c)) }
case Put(v, sp) => if (q.isEmpty) Get { case (_, c) => Put((v, c), delay(sp, q)) }
else {
val (c, rest) = q.dequeue
Put((v, c), delay(sp, rest))
}
}
}
implicit object arrow extends Arrow
}
object StreamProcessors extends App {
import scalaz.std.function.function1Instance
import scalaz.syntax.arrow._
import SP.arrow._
implicit def biUntupled[A,B,C](f: ((A, B)) => C): (A, B) => C = Function.untupled(f)
def pairwise[A, B, C](f: (A, A) => (B, C)): SP[A, (B, C)] =
Get(x => Get(y => Put(f(x, y), pairwise(f))))
def addMul(x: Int, y: Int) = (x + y, x * y)
println(pairwise(addMul).run(Stream(1,2,3,4,5,6)).force)
//= Stream((3,2), (7,12), (11,30))
def dup[X]: SP[X, X] = Get(x => Put(x, Put(x, dup)))
def biwindow[A, B, C](f: (A, A) => (B, C)): SP[A, (B, C)] =
(Get(x => Put(x, dup)): SP[A, A]) >>> pairwise(f)
println(biwindow(addMul).run(Stream(1,2,3,4,5,6)).force)
//= Stream((3,2), (5,6), (7,12), (9,20), (11,30))
val add1 = (_ : Int) + 1
val mul2 = (_ : Int) * 2
val neg = -(_: Int)
val pipeline = biwindow(mul2 *** add1) >>> first(arr(neg))
println(pipeline.run(Stream(1,2,3,4,5,6)).force)
//= Stream((-2,3), (-4,4), (-6,5), (-8,6), (-10,7))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment