Skip to content

Instantly share code, notes, and snippets.

@dead-claudia
Created February 11, 2017 09:29
Show Gist options
  • Save dead-claudia/d93fdbb861d4a5dca2be0ab3ddfd248e to your computer and use it in GitHub Desktop.
Save dead-claudia/d93fdbb861d4a5dca2be0ab3ddfd248e to your computer and use it in GitHub Desktop.
A simple utility set for observables, featuring my stream function strawman
export function cast(observable) {
if (typeof observable === "function") {
return observable()
} else {
return observable
}
}
export stream function from(items) {
for (const item of items) emit item
}
export stream function of(...items) {
for (const item of items) emit item
}
export stream function repeat(...items) {
while (true) emit* from(items)
}
export stream function range(start, end=undefined, step=1) {
if (end == null) {
end = start
start = 0
}
while (start < end) {
emit start
start += step
}
}
export stream function ofError(arg) {
throw arg
}
export stream function recover(observable, arg) {
try {
emit* observable
} catch (e) {
emit* func(e)
}
}
export stream function concat(left, right) {
emit* left
emit* right
}
export stream function concatAll(observable) {
for (const inner from observable) {
emit* inner
}
}
export stream function map(observable, func) {
for (const item from observable) emit func(item)
}
export stream function concatMap(observable, func) {
for (const item from observable) emit* func(item)
}
export async function every(observable, func) {
for await (const item from observable) {
if (!func(item)) return false
}
return true
}
export async function some(observable, func) {
for await (const item from observable) {
if (func(item)) return true
}
return false
}
export function merge(left, right) {
return parallel {
emit* left
emit* right
}
}
export stream function zip(left, right) {
let lefts = []
let rights = []
parallel {
for (const item of left) {
if (rights.length) emit [item, rights.shift()]
else lefts.push(item)
}
for (const item of right) {
if (lefts.length) emit [lefts.shift(), item]
else rights.push(item)
}
}
return
}
export async function find(observable, func) {
for await (const item from observable) {
if (func(item)) return item
}
return undefined
}
export function includes(observable, item) {
return some(observable, i => item === i)
}
export async function forEach(observable, func) {
for await (const item from observable) {
await func(item)
}
}
export async function reduce(observable, func, start) {
let init = arguments.length < 3
for await (const item from observable) {
start = init ? item : func(start, item)
init = false
}
return start
}
export stream function take(observable, n) {
if (n === 0) return
for (const item from observable) {
emit item
if (--n === 0) break
}
}
export stream function drop(observable, n) {
for (const item from observable) {
if (n === 0) emit item
else n--
}
}
export stream function slice(observable, start, end) {
if (end === 0 || end < start) return
let count = 0
for (const item from observable) {
if (count === end) return
if (count >= start) emit item
count++
}
}
export stream function throttle(observable, delay) {
let date = 0
for (const item from observable) {
const current = Date.now()
if (current < date) continue
date = current + delay
emit item
}
}
export stream function debounce(observable, delay) {
let date = 0
for (const item from observable) {
const current = Date.now()
if (current < date) {
date = current + delay
} else {
emit item
}
}
}
export stream function unique(observable) {
const found = new Set()
for (const item from observable) {
if (found.has(item)) continue
found.add(item)
emit item
}
}
export stream function uniqueBy(observable, func) {
const found = new Set()
for (const item from observable) {
if (found.has(func(item))) continue
found.add(item)
emit item
}
}
export stream function retry(observable, times) {
if (times === 0) return emit* observable
let err
do {
try {
return emit* observable
} catch (e) {
err = e
}
} while (times)
throw e
}
export stream function window(observable, boundaries) {
const entries = []
parallel {
for (const current from observable) {
entries.push(current)
}
for (const _ from boundaries) {
const copy = entries
entries = []
for (const item of entries) emit item
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment