Skip to content

Instantly share code, notes, and snippets.

@jackfirth
Last active August 26, 2022 21:47
Show Gist options
  • Save jackfirth/f7bab2904aeba230f702febdfb45db67 to your computer and use it in GitHub Desktop.
Save jackfirth/f7bab2904aeba230f702febdfb45db67 to your computer and use it in GitHub Desktop.
An Effekt implementation of transducers and stream pipelines.
// This is my attempt at translating the stream pipeline and transducer library I wrote for Racket
// into Effekt, using algebraic effects to model the actions of consuming and emitting values in a
// stream pipeline. The stream pipeline library I wrote is part of my package Rebellion, see its docs
// here: https://docs.racket-lang.org/rebellion/Streaming_Computations.html. The basics are that a
// stream pipeline combines an initial source (a lazy stream, sequence, or other similar object) and
// a terminal sink, called a *reducer*, along with any number of intermediate transformations called
// *transducers*.
type Option[A] { Present(value: A); Absent() }
effect Iterate[A] { def emit(element: A): Unit }
effect Reduce[A] { def consume(): Option[A] }
// Ideally I'd make a Transduce effect that's an alias for combining Iterate and Reduce,
// but effect Transduce[A, B] = { Iterate[A], Reduce[B] } doesn't seem to work. Defining
// a new Transduce effect with its own handlers works, but requires awkward conversion
// handlers when using iterators and reducers within transducers (such as with flatMapping)
// effect Transduce[A, B] {
// def consume(): Option[A]
// def emit(element: B): Unit
// }
// I'd also really like to make Iterable, Transducer, and Reducer types or type aliases
// that would be shorthand for functions with the corresponding effects. They'd look like this:
//
// type alias Iterator[A] = () => Unit / Iterate[A]
// type alias Transducer[A, B] = () => Unit / { Reduce[A], Iterate[B] }
// type alias Reducer[A, B] = () => B / Reduce[A]
//
// An Iterator[A] and a Reducer[A, B] gives you a way to produce a single B, and a
// Transducer[A, B] can be used to either turn an Iterator[A] into an Iterator[B] or a
// Reducer[B, X] into a Reducer[A, X].
def reduce[A, B]
{ iterator: () => Unit / Iterate[A] }
{ reducer: () => B / Reduce[A] }
: B = {
var upstream = fun() { Absent() }
upstream = fun() {
try {
iterator()
Absent()
} with Iterate[A] {
def emit(a) = {
upstream = fun() { resume(()) }
Present(a)
}
}
}
try {
reducer()
} with Reduce[A] {
def consume() = {
resume(upstream())
}
}
}
// This is the amazing part. The above reduce function for fusing an iterator and a reducer can be
// *reused* to implement transducer fusion, as well as fusion of transducers with iterators and reducers.
// This works because a transducer is from one perspective just an iterator with an extra Reduce effect, and
// from another perspective just a reducer with an extra Iterate effect. Effekt's contextual effect
// polymorphism means literally the only thing I need to do to reuse reduce() for these purposes is eta-expand
// the transducers so that the type inference works correctly.
// Fuses a transducer with an upstream iterator and runs the fused iterator.
def inTransduced[A, B]
{ iterator: () => Unit / Iterate[A] }
{ transducer: () => Unit / { Reduce[A], Iterate[B] } }
: Unit / Iterate[B] =
reduce[A, Unit] { iterator } { () => transducer() }
// Fuses a transducer with a downstream reducer and runs the fused reducer.
def intoTransduced[A, B, C]
{ transducer: () => Unit / { Reduce[A], Iterate[B] } }
{ reducer: () => C / Reduce[B] }
: C / Reduce[A] =
reduce[B, C] { () => transducer() } { reducer }
// Fuses two transducers together in series and runs the fused transducer.
def fusing[A, B, C]
{ upstreamTransducer: () => Unit / { Reduce[A], Iterate[B] } }
{ downstreamTransducer: () => Unit / { Reduce[B], Iterate[C] } }
: Unit / { Reduce[A], Iterate[C] } =
reduce[B, Unit] { () => upstreamTransducer() } { () => downstreamTransducer() }
// Like reduce, but with a transducer in the middle.
def transduce[A, B, C]
{ iterator: () => Unit / Iterate[A] }
{ transducer: () => Unit / { Reduce[A], Iterate[B] } }
{ reducer: () => C / Reduce[B] }
: C =
reduce[A, C] { iterator } { () => intoTransduced[A, B, C] { transducer } { reducer } }
// This is just a little utility to make some things easier to write. It reads nicer than
// while (true) to me.
def repeatedly { f: () => Unit } : Unit = {
f()
repeatedly { f }
}
def inUnfold[A](seed: A) { f: A => A } : Unit / Iterate[A] = {
do emit(seed)
inUnfold(f(seed)) { f }
}
def intoFold[A, S](initState: S) { f: (S, A) => S } : S / Reduce[A] =
do consume[A]() match {
case Present(a) => intoFold(f(initState, a)) { f }
case Absent() => initState
}
def flatMapping[A, B] { f: A => Unit / Iterate[B] } : Unit / { Reduce[A], Iterate[B] } =
repeatedly {
do consume[A]() match {
case Absent() => ()
case Present(a) => f(a)
}
}
def mapping[A, B] { f: A => B } : Unit / { Reduce[A], Iterate[B] } =
repeatedly {
do consume[A]() match {
case Absent() => ()
case Present(a) => do emit(f(a))
}
}
def takingWhile[A]() { p: A => Boolean } : Unit / { Reduce[A], Iterate[A] } =
repeatedly {
do consume[A]() match {
case Absent() => ()
case Present(a) => if (p(a)) do emit(a)
}
}
def inNaturals(start: Int): Unit / Iterate[Int] = inUnfold(start) { x => x + 1 }
def inRange(start: Int, end: Int): Unit / Iterate[Int] =
inTransduced[Int, Int] { inNaturals(start) } { takingWhile { x => x < end }}
def intoSum(): Int / Reduce[Int] = intoFold(0) { (s, a) => s + a }
def intoProduct(): Int / Reduce[Int] = intoFold(1) { (s, a) => s * a }
def main() =
transduce[Int, Int, Int]
{ inRange(0, 5) }
{ mapping { (x: Int) => x * x } }
{ intoSum() }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment