Created
May 16, 2021 03:08
-
-
Save iocat/3862c28cd736a4d4a849fbab3e65965a to your computer and use it in GitHub Desktop.
Some Stream utilities in Rescript.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let map = (stream, functor) => | |
Stream.from(_ => | |
try Some(stream->Stream.next->functor) catch { | |
| Stream.Failure => None | |
} | |
) | |
let keep = (stream, predicate) => | |
Stream.from(_ => { | |
let rec getNext = () => { | |
try { | |
let next = stream->Stream.next | |
if next->predicate { | |
Some(next) | |
} else { | |
getNext() | |
} | |
} catch { | |
| Stream.Failure => None | |
} | |
} | |
getNext() | |
}) | |
let toList = (stream: Stream.t<'a>): list<'a> => { | |
let l = ref(list{}) | |
stream->Stream.iter(item => { | |
l := l.contents->Belt.List.add(item) | |
}, _) | |
l.contents | |
} | |
let fromArray = items => { | |
let i = ref(-1) | |
Stream.from(_ => { | |
i := i.contents + 1 | |
if i.contents < items->Belt.Array.length { | |
Some(items[i.contents]) | |
} else { | |
None | |
} | |
}) | |
} | |
let fromList = Stream.of_list | |
let discard = (stream, predicate) => stream->keep(item => !(item->predicate)) | |
let fork = stream => { | |
open Belt | |
let q1 = MutableQueue.make() | |
let q2 = MutableQueue.make() | |
let makeStream = (. myQueue, otherQueue): (int => option<'a>) => { | |
_ => { | |
open MutableQueue | |
if myQueue->isEmpty { | |
try { | |
// Consumes from the original queue and broadcast it to the other stream. | |
let value = stream->Stream.next | |
otherQueue->add(value) | |
Some(value) | |
} catch { | |
| Stream.Failure => None | |
} | |
} else { | |
myQueue->pop | |
} | |
} | |
} | |
(Stream.from(makeStream(. q1, q2)), Stream.from(makeStream(. q2, q1))) | |
} | |
let keepMap = (stream, predicate) => | |
Stream.from(_ => { | |
let rec getNext = () => { | |
try { | |
let next = stream->Stream.next | |
switch next->predicate { | |
| Some(mappedNext) => Some(mappedNext) | |
| None => getNext() | |
} | |
} catch { | |
| Stream.Failure => None | |
} | |
} | |
getNext() | |
}) | |
let fold = (stream, accum, reducer) => { | |
let result = ref(accum) | |
stream->Stream.iter(item => { | |
result := reducer(result.contents, item) | |
}, _) | |
result.contents | |
} | |
let flatten = streams => { | |
let currStream = ref(None) | |
let rec next = (_index: int) => { | |
switch currStream.contents { | |
| None => | |
try { | |
currStream := Some(streams->Stream.next) | |
next(_index) | |
} catch { | |
| Stream.Failure => { | |
currStream := None | |
None | |
} | |
} | |
| Some(curr) => | |
try { | |
let nextItem = curr->Stream.next | |
Some(nextItem) | |
} catch { | |
| Stream.Failure => { | |
currStream := None | |
next(_index) | |
} | |
} | |
} | |
} | |
Stream.from(next) | |
} | |
let concat2 = (a, b) => fromList(list{a, b})->flatten | |
let concat3 = (a, b, c) => fromList(list{a, b, c})->flatten | |
let concat4 = (a, b, c, d) => fromList(list{a, b, c, d})->flatten | |
let concat5 = (a, b, c, d, e) => fromList(list{a, b, c, d, e})->flatten | |
let zip2 = (a, b) => | |
Stream.from(_ => { | |
try { | |
let itemA = a->Stream.next | |
let itemB = b->Stream.next | |
Some((itemA, itemB)) | |
} catch { | |
| Stream.Failure => None | |
} | |
}) | |
let zip3 = (a, b, c) => | |
Stream.from(_ => { | |
try { | |
let itemA = a->Stream.next | |
let itemB = b->Stream.next | |
let itemC = c->Stream.next | |
Some((itemA, itemB, itemC)) | |
} catch { | |
| Stream.Failure => None | |
} | |
}) | |
let zip4 = (a, b, c, d) => | |
Stream.from(_ => { | |
try { | |
let itemA = a->Stream.next | |
let itemB = b->Stream.next | |
let itemC = c->Stream.next | |
let itemD = d->Stream.next | |
Some((itemA, itemB, itemC, itemD)) | |
} catch { | |
| Stream.Failure => None | |
} | |
}) | |
let zip5 = (a, b, c, d, e) => | |
Stream.from(_ => { | |
try { | |
let itemA = a->Stream.next | |
let itemB = b->Stream.next | |
let itemC = c->Stream.next | |
let itemD = d->Stream.next | |
let itemE = e->Stream.next | |
Some((itemA, itemB, itemC, itemD, itemE)) | |
} catch { | |
| Stream.Failure => None | |
} | |
}) | |
let zipMany = streams => { | |
let allStreams = streams->toList | |
Stream.from(_ => { | |
try Some(allStreams->Belt.List.map(stream => stream->Stream.next)) catch { | |
| Stream.Failure => None | |
} | |
}) | |
} | |
let void = () => Stream.from(_ => None) | |
let once = value => Stream.of_list(list{value}) | |
let infinite = value => Stream.from(_ => Some(value)) | |
let consumeEach = (stream, work) => stream->Stream.iter(work, _) | |
let iterateEach = (stream, work) => { | |
let (og, forked) = stream->fork | |
og->consumeEach(work) | |
forked | |
} | |
let sumInt = stream => stream->fold(0, (a, b) => a + b) | |
let sumFloat = stream => stream->fold(0., (a, b) => a +. b) | |
let join = (~sep=" ", stream) => | |
stream->fold("", (acc, item) => acc == "" ? item : Js.String2.concatMany(acc, [sep, item])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Lazy stream operations. | |
let map: (Stream.t<'a>, 'a => 'b) => Stream.t<'b> | |
let keepMap: (Stream.t<'a>, 'a => option<'b>) => Stream.t<'b> | |
let keep: (Stream.t<'a>, 'a => bool) => Stream.t<'a> | |
// Like keep, but the predicate is used to discard. | |
let discard: (Stream.t<'a>, 'a => bool) => Stream.t<'a> | |
let fork: Stream.t<'a> => (Stream.t<'a>, Stream.t<'a>) | |
let fold: (Stream.t<'a>, 'b, ('b, 'a) => 'b) => 'b | |
let flatten: Stream.t<Stream.t<'a>> => Stream.t<'a> | |
let concat2: (Stream.t<'a>, Stream.t<'a>) => Stream.t<'a> | |
let concat3: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a> | |
let concat4: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a> | |
let concat5: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a> | |
let zipMany: Stream.t<Stream.t<'a>> => Stream.t<list<'a>> | |
let zip2: (Stream.t<'a>, Stream.t<'b>) => Stream.t<('a, 'b)> | |
let zip3: (Stream.t<'a>, Stream.t<'b>, Stream.t<'c>) => Stream.t<('a, 'b, 'c)> | |
let zip4: (Stream.t<'a>, Stream.t<'b>, Stream.t<'c>, Stream.t<'d>) => Stream.t<('a, 'b, 'c, 'd)> | |
let zip5: ( | |
Stream.t<'a>, | |
Stream.t<'b>, | |
Stream.t<'c>, | |
Stream.t<'d>, | |
Stream.t<'e>, | |
) => Stream.t<('a, 'b, 'c, 'd, 'e)> | |
// Stream construction. | |
let void: unit => Stream.t<'a> | |
let once: 'a => Stream.t<'a> | |
let infinite: 'a => Stream.t<'a> | |
let fromList: list<'a> => Stream.t<'a> | |
let fromArray: array<'a> => Stream.t<'a> | |
// Stream consumption | |
let toList: Stream.t<'a> => list<'a> | |
let consumeEach: (Stream.t<'a>, 'a => unit) => unit | |
// Similar to consumeEach, but dehydrate the stream, | |
// do work, and return the clone of it. | |
// | |
// This operation revives the stream. | |
let iterateEach: (Stream.t<'a>, 'a => unit) => Stream.t<'a> | |
let sumInt: Stream.t<int> => int | |
let sumFloat: Stream.t<float> => float | |
let join: (~sep: string=?, Stream.t<string>) => string |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment