Skip to content

Instantly share code, notes, and snippets.

@iocat
Created May 16, 2021 03:08
Show Gist options
  • Save iocat/3862c28cd736a4d4a849fbab3e65965a to your computer and use it in GitHub Desktop.
Save iocat/3862c28cd736a4d4a849fbab3e65965a to your computer and use it in GitHub Desktop.
Some Stream utilities in Rescript.
let map = (stream, functor) =>
Stream.from(_ =>
try Some(stream->Stream.next->functor) catch {
| Stream.Failure => None
}
)
let keep = (stream, predicate) =>
Stream.from(_ => {
let rec getNext = () => {
try {
let next = stream->Stream.next
if next->predicate {
Some(next)
} else {
getNext()
}
} catch {
| Stream.Failure => None
}
}
getNext()
})
let toList = (stream: Stream.t<'a>): list<'a> => {
let l = ref(list{})
stream->Stream.iter(item => {
l := l.contents->Belt.List.add(item)
}, _)
l.contents
}
let fromArray = items => {
let i = ref(-1)
Stream.from(_ => {
i := i.contents + 1
if i.contents < items->Belt.Array.length {
Some(items[i.contents])
} else {
None
}
})
}
let fromList = Stream.of_list
let discard = (stream, predicate) => stream->keep(item => !(item->predicate))
let fork = stream => {
open Belt
let q1 = MutableQueue.make()
let q2 = MutableQueue.make()
let makeStream = (. myQueue, otherQueue): (int => option<'a>) => {
_ => {
open MutableQueue
if myQueue->isEmpty {
try {
// Consumes from the original queue and broadcast it to the other stream.
let value = stream->Stream.next
otherQueue->add(value)
Some(value)
} catch {
| Stream.Failure => None
}
} else {
myQueue->pop
}
}
}
(Stream.from(makeStream(. q1, q2)), Stream.from(makeStream(. q2, q1)))
}
let keepMap = (stream, predicate) =>
Stream.from(_ => {
let rec getNext = () => {
try {
let next = stream->Stream.next
switch next->predicate {
| Some(mappedNext) => Some(mappedNext)
| None => getNext()
}
} catch {
| Stream.Failure => None
}
}
getNext()
})
let fold = (stream, accum, reducer) => {
let result = ref(accum)
stream->Stream.iter(item => {
result := reducer(result.contents, item)
}, _)
result.contents
}
let flatten = streams => {
let currStream = ref(None)
let rec next = (_index: int) => {
switch currStream.contents {
| None =>
try {
currStream := Some(streams->Stream.next)
next(_index)
} catch {
| Stream.Failure => {
currStream := None
None
}
}
| Some(curr) =>
try {
let nextItem = curr->Stream.next
Some(nextItem)
} catch {
| Stream.Failure => {
currStream := None
next(_index)
}
}
}
}
Stream.from(next)
}
let concat2 = (a, b) => fromList(list{a, b})->flatten
let concat3 = (a, b, c) => fromList(list{a, b, c})->flatten
let concat4 = (a, b, c, d) => fromList(list{a, b, c, d})->flatten
let concat5 = (a, b, c, d, e) => fromList(list{a, b, c, d, e})->flatten
let zip2 = (a, b) =>
Stream.from(_ => {
try {
let itemA = a->Stream.next
let itemB = b->Stream.next
Some((itemA, itemB))
} catch {
| Stream.Failure => None
}
})
let zip3 = (a, b, c) =>
Stream.from(_ => {
try {
let itemA = a->Stream.next
let itemB = b->Stream.next
let itemC = c->Stream.next
Some((itemA, itemB, itemC))
} catch {
| Stream.Failure => None
}
})
let zip4 = (a, b, c, d) =>
Stream.from(_ => {
try {
let itemA = a->Stream.next
let itemB = b->Stream.next
let itemC = c->Stream.next
let itemD = d->Stream.next
Some((itemA, itemB, itemC, itemD))
} catch {
| Stream.Failure => None
}
})
let zip5 = (a, b, c, d, e) =>
Stream.from(_ => {
try {
let itemA = a->Stream.next
let itemB = b->Stream.next
let itemC = c->Stream.next
let itemD = d->Stream.next
let itemE = e->Stream.next
Some((itemA, itemB, itemC, itemD, itemE))
} catch {
| Stream.Failure => None
}
})
let zipMany = streams => {
let allStreams = streams->toList
Stream.from(_ => {
try Some(allStreams->Belt.List.map(stream => stream->Stream.next)) catch {
| Stream.Failure => None
}
})
}
let void = () => Stream.from(_ => None)
let once = value => Stream.of_list(list{value})
let infinite = value => Stream.from(_ => Some(value))
let consumeEach = (stream, work) => stream->Stream.iter(work, _)
let iterateEach = (stream, work) => {
let (og, forked) = stream->fork
og->consumeEach(work)
forked
}
let sumInt = stream => stream->fold(0, (a, b) => a + b)
let sumFloat = stream => stream->fold(0., (a, b) => a +. b)
let join = (~sep=" ", stream) =>
stream->fold("", (acc, item) => acc == "" ? item : Js.String2.concatMany(acc, [sep, item]))
// Lazy stream operations.
let map: (Stream.t<'a>, 'a => 'b) => Stream.t<'b>
let keepMap: (Stream.t<'a>, 'a => option<'b>) => Stream.t<'b>
let keep: (Stream.t<'a>, 'a => bool) => Stream.t<'a>
// Like keep, but the predicate is used to discard.
let discard: (Stream.t<'a>, 'a => bool) => Stream.t<'a>
let fork: Stream.t<'a> => (Stream.t<'a>, Stream.t<'a>)
let fold: (Stream.t<'a>, 'b, ('b, 'a) => 'b) => 'b
let flatten: Stream.t<Stream.t<'a>> => Stream.t<'a>
let concat2: (Stream.t<'a>, Stream.t<'a>) => Stream.t<'a>
let concat3: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a>
let concat4: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a>
let concat5: (Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>, Stream.t<'a>) => Stream.t<'a>
let zipMany: Stream.t<Stream.t<'a>> => Stream.t<list<'a>>
let zip2: (Stream.t<'a>, Stream.t<'b>) => Stream.t<('a, 'b)>
let zip3: (Stream.t<'a>, Stream.t<'b>, Stream.t<'c>) => Stream.t<('a, 'b, 'c)>
let zip4: (Stream.t<'a>, Stream.t<'b>, Stream.t<'c>, Stream.t<'d>) => Stream.t<('a, 'b, 'c, 'd)>
let zip5: (
Stream.t<'a>,
Stream.t<'b>,
Stream.t<'c>,
Stream.t<'d>,
Stream.t<'e>,
) => Stream.t<('a, 'b, 'c, 'd, 'e)>
// Stream construction.
let void: unit => Stream.t<'a>
let once: 'a => Stream.t<'a>
let infinite: 'a => Stream.t<'a>
let fromList: list<'a> => Stream.t<'a>
let fromArray: array<'a> => Stream.t<'a>
// Stream consumption
let toList: Stream.t<'a> => list<'a>
let consumeEach: (Stream.t<'a>, 'a => unit) => unit
// Similar to consumeEach, but dehydrate the stream,
// do work, and return the clone of it.
//
// This operation revives the stream.
let iterateEach: (Stream.t<'a>, 'a => unit) => Stream.t<'a>
let sumInt: Stream.t<int> => int
let sumFloat: Stream.t<float> => float
let join: (~sep: string=?, Stream.t<string>) => string
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment