Last active
August 26, 2022 21:47
-
-
Save jackfirth/f7bab2904aeba230f702febdfb45db67 to your computer and use it in GitHub Desktop.
An Effekt implementation of transducers and stream pipelines.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is my attempt at translating the stream pipeline and transducer library I wrote for Racket | |
// into Effekt, using algebraic effects to model the actions of consuming and emitting values in a | |
// stream pipeline. The stream pipeline library I wrote is part of my package Rebellion, see its docs | |
// here: https://docs.racket-lang.org/rebellion/Streaming_Computations.html. The basics are that a | |
// stream pipeline combines an initial source (a lazy stream, sequence, or other similar object) and | |
// a terminal sink, called a *reducer*, along with any number of intermediate transformations called | |
// *transducers*. | |
type Option[A] { Present(value: A); Absent() } | |
effect Iterate[A] { def emit(element: A): Unit } | |
effect Reduce[A] { def consume(): Option[A] } | |
// Ideally I'd make a Transduce effect that's an alias for combining Iterate and Reduce, | |
// but effect Transduce[A, B] = { Iterate[A], Reduce[B] } doesn't seem to work. Defining | |
// a new Transduce effect with its own handlers works, but requires awkward conversion | |
// handlers when using iterators and reducers within transducers (such as with flatMapping) | |
// effect Transduce[A, B] { | |
// def consume(): Option[A] | |
// def emit(element: B): Unit | |
// } | |
// I'd also really like to make Iterable, Transducer, and Reducer types or type aliases | |
// that would be shorthand for functions with the corresponding effects. They'd look like this: | |
// | |
// type alias Iterator[A] = () => Unit / Iterate[A] | |
// type alias Transducer[A, B] = () => Unit / { Reduce[A], Iterate[B] } | |
// type alias Reducer[A, B] = () => B / Reduce[A] | |
// | |
// An Iterator[A] and a Reducer[A, B] gives you a way to produce a single B, and a | |
// Transducer[A, B] can be used to either turn an Iterator[A] into an Iterator[B] or a | |
// Reducer[B, X] into a Reducer[A, X]. | |
def reduce[A, B] | |
{ iterator: () => Unit / Iterate[A] } | |
{ reducer: () => B / Reduce[A] } | |
: B = { | |
var upstream = fun() { Absent() } | |
upstream = fun() { | |
try { | |
iterator() | |
Absent() | |
} with Iterate[A] { | |
def emit(a) = { | |
upstream = fun() { resume(()) } | |
Present(a) | |
} | |
} | |
} | |
try { | |
reducer() | |
} with Reduce[A] { | |
def consume() = { | |
resume(upstream()) | |
} | |
} | |
} | |
// This is the amazing part. The above reduce function for fusing an iterator and a reducer can be | |
// *reused* to implement transducer fusion, as well as fusion of transducers with iterators and reducers. | |
// This works because a transducer is from one perspective just an iterator with an extra Reduce effect, and | |
// from another perspective just a reducer with an extra Iterate effect. Effekt's contextual effect | |
// polymorphism means literally the only thing I need to do to reuse reduce() for these purposes is eta-expand | |
// the transducers so that the type inference works correctly. | |
// Fuses a transducer with an upstream iterator and runs the fused iterator. | |
def inTransduced[A, B] | |
{ iterator: () => Unit / Iterate[A] } | |
{ transducer: () => Unit / { Reduce[A], Iterate[B] } } | |
: Unit / Iterate[B] = | |
reduce[A, Unit] { iterator } { () => transducer() } | |
// Fuses a transducer with a downstream reducer and runs the fused reducer. | |
def intoTransduced[A, B, C] | |
{ transducer: () => Unit / { Reduce[A], Iterate[B] } } | |
{ reducer: () => C / Reduce[B] } | |
: C / Reduce[A] = | |
reduce[B, C] { () => transducer() } { reducer } | |
// Fuses two transducers together in series and runs the fused transducer. | |
def fusing[A, B, C] | |
{ upstreamTransducer: () => Unit / { Reduce[A], Iterate[B] } } | |
{ downstreamTransducer: () => Unit / { Reduce[B], Iterate[C] } } | |
: Unit / { Reduce[A], Iterate[C] } = | |
reduce[B, Unit] { () => upstreamTransducer() } { () => downstreamTransducer() } | |
// Like reduce, but with a transducer in the middle. | |
def transduce[A, B, C] | |
{ iterator: () => Unit / Iterate[A] } | |
{ transducer: () => Unit / { Reduce[A], Iterate[B] } } | |
{ reducer: () => C / Reduce[B] } | |
: C = | |
reduce[A, C] { iterator } { () => intoTransduced[A, B, C] { transducer } { reducer } } | |
// This is just a little utility to make some things easier to write. It reads nicer than | |
// while (true) to me. | |
def repeatedly { f: () => Unit } : Unit = { | |
f() | |
repeatedly { f } | |
} | |
def inUnfold[A](seed: A) { f: A => A } : Unit / Iterate[A] = { | |
do emit(seed) | |
inUnfold(f(seed)) { f } | |
} | |
def intoFold[A, S](initState: S) { f: (S, A) => S } : S / Reduce[A] = | |
do consume[A]() match { | |
case Present(a) => intoFold(f(initState, a)) { f } | |
case Absent() => initState | |
} | |
def flatMapping[A, B] { f: A => Unit / Iterate[B] } : Unit / { Reduce[A], Iterate[B] } = | |
repeatedly { | |
do consume[A]() match { | |
case Absent() => () | |
case Present(a) => f(a) | |
} | |
} | |
def mapping[A, B] { f: A => B } : Unit / { Reduce[A], Iterate[B] } = | |
repeatedly { | |
do consume[A]() match { | |
case Absent() => () | |
case Present(a) => do emit(f(a)) | |
} | |
} | |
def takingWhile[A]() { p: A => Boolean } : Unit / { Reduce[A], Iterate[A] } = | |
repeatedly { | |
do consume[A]() match { | |
case Absent() => () | |
case Present(a) => if (p(a)) do emit(a) | |
} | |
} | |
def inNaturals(start: Int): Unit / Iterate[Int] = inUnfold(start) { x => x + 1 } | |
def inRange(start: Int, end: Int): Unit / Iterate[Int] = | |
inTransduced[Int, Int] { inNaturals(start) } { takingWhile { x => x < end }} | |
def intoSum(): Int / Reduce[Int] = intoFold(0) { (s, a) => s + a } | |
def intoProduct(): Int / Reduce[Int] = intoFold(1) { (s, a) => s * a } | |
def main() = | |
transduce[Int, Int, Int] | |
{ inRange(0, 5) } | |
{ mapping { (x: Int) => x * x } } | |
{ intoSum() } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment