Created
February 11, 2017 09:29
-
-
Save dead-claudia/d93fdbb861d4a5dca2be0ab3ddfd248e to your computer and use it in GitHub Desktop.
A simple utility set for observables, featuring my stream function strawman
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function cast(observable) { | |
if (typeof observable === "function") { | |
return observable() | |
} else { | |
return observable | |
} | |
} | |
export stream function from(items) { | |
for (const item of items) emit item | |
} | |
export stream function of(...items) { | |
for (const item of items) emit item | |
} | |
export stream function repeat(...items) { | |
while (true) emit* from(items) | |
} | |
export stream function range(start, end=undefined, step=1) { | |
if (end == null) { | |
end = start | |
start = 0 | |
} | |
while (start < end) { | |
emit start | |
start += step | |
} | |
} | |
export stream function ofError(arg) { | |
throw arg | |
} | |
export stream function recover(observable, arg) { | |
try { | |
emit* observable | |
} catch (e) { | |
emit* func(e) | |
} | |
} | |
export stream function concat(left, right) { | |
emit* left | |
emit* right | |
} | |
export stream function concatAll(observable) { | |
for (const inner from observable) { | |
emit* inner | |
} | |
} | |
export stream function map(observable, func) { | |
for (const item from observable) emit func(item) | |
} | |
export stream function concatMap(observable, func) { | |
for (const item from observable) emit* func(item) | |
} | |
export async function every(observable, func) { | |
for await (const item from observable) { | |
if (!func(item)) return false | |
} | |
return true | |
} | |
export async function some(observable, func) { | |
for await (const item from observable) { | |
if (func(item)) return true | |
} | |
return false | |
} | |
export function merge(left, right) { | |
return parallel { | |
emit* left | |
emit* right | |
} | |
} | |
export stream function zip(left, right) { | |
let lefts = [] | |
let rights = [] | |
parallel { | |
for (const item of left) { | |
if (rights.length) emit [item, rights.shift()] | |
else lefts.push(item) | |
} | |
for (const item of right) { | |
if (lefts.length) emit [lefts.shift(), item] | |
else rights.push(item) | |
} | |
} | |
return | |
} | |
export async function find(observable, func) { | |
for await (const item from observable) { | |
if (func(item)) return item | |
} | |
return undefined | |
} | |
export function includes(observable, item) { | |
return some(observable, i => item === i) | |
} | |
export async function forEach(observable, func) { | |
for await (const item from observable) { | |
await func(item) | |
} | |
} | |
export async function reduce(observable, func, start) { | |
let init = arguments.length < 3 | |
for await (const item from observable) { | |
start = init ? item : func(start, item) | |
init = false | |
} | |
return start | |
} | |
export stream function take(observable, n) { | |
if (n === 0) return | |
for (const item from observable) { | |
emit item | |
if (--n === 0) break | |
} | |
} | |
export stream function drop(observable, n) { | |
for (const item from observable) { | |
if (n === 0) emit item | |
else n-- | |
} | |
} | |
export stream function slice(observable, start, end) { | |
if (end === 0 || end < start) return | |
let count = 0 | |
for (const item from observable) { | |
if (count === end) return | |
if (count >= start) emit item | |
count++ | |
} | |
} | |
export stream function throttle(observable, delay) { | |
let date = 0 | |
for (const item from observable) { | |
const current = Date.now() | |
if (current < date) continue | |
date = current + delay | |
emit item | |
} | |
} | |
export stream function debounce(observable, delay) { | |
let date = 0 | |
for (const item from observable) { | |
const current = Date.now() | |
if (current < date) { | |
date = current + delay | |
} else { | |
emit item | |
} | |
} | |
} | |
export stream function unique(observable) { | |
const found = new Set() | |
for (const item from observable) { | |
if (found.has(item)) continue | |
found.add(item) | |
emit item | |
} | |
} | |
export stream function uniqueBy(observable, func) { | |
const found = new Set() | |
for (const item from observable) { | |
if (found.has(func(item))) continue | |
found.add(item) | |
emit item | |
} | |
} | |
export stream function retry(observable, times) { | |
if (times === 0) return emit* observable | |
let err | |
do { | |
try { | |
return emit* observable | |
} catch (e) { | |
err = e | |
} | |
} while (times) | |
throw e | |
} | |
export stream function window(observable, boundaries) { | |
const entries = [] | |
parallel { | |
for (const current from observable) { | |
entries.push(current) | |
} | |
for (const _ from boundaries) { | |
const copy = entries | |
entries = [] | |
for (const item of entries) emit item | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment