Created
December 25, 2015 22:56
-
-
Save rpominov/20b62ed56c78376c224f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* @flow */ | |
type Sink<T> = (payload: T) => void | |
type Disposer = () => void | |
type Stream<T> = (s: Sink<T>) => Disposer | |
type F<A, B> = (x: A) => B | |
type LiftedF<A, B> = (s: Stream<A>) => Stream<B> | |
/* Lifts function `A => B` to a funcion that operates | |
* on streams `Stream<A> => Stream<B>` | |
*/ | |
type Lift<A, B> = (f: F<A, B>) => LiftedF<A, B> | |
export const lift: Lift = fn => stream => | |
sink => stream(payload => sink(fn(payload))) | |
/* Give a predicate `A => boolean` returns a funcion | |
* that operates on streams `Stream<A> => Stream<A>`. | |
* The result function returns a stream without values that don't satisfy predicate. | |
*/ | |
type Filter<A> = (f: F<A, boolean>) => LiftedF<A, A> | |
export const filter: Filter = predicate => stream => | |
sink => stream(payload => { | |
if (predicate(payload)) { | |
sink(payload) | |
} | |
}) | |
/* Given a function `A => Stream<B>` returns a funcion | |
* that operates on streams `Stream<A> => Stream<B>`. | |
* The result function will spawn a `Stream<B>` for each value from `Stream<A>` using provided function. | |
* The final `Stream<B>` will contain values from all spawned `Stream<B>`. | |
*/ | |
type Chain<A, B> = (f: F<A, Stream<B>>) => LiftedF<A, B> | |
export const chain: Chain = fn => stream => | |
sink => { | |
let spawnedDisposers = [] | |
const mainDisposer = stream(payload => { | |
spawnedDisposers.push(fn(payload)(sink)) | |
}) | |
return () => { | |
spawnedDisposers.forEach(fn => fn()) | |
mainDisposer() | |
} | |
} | |
/* Same as chain(), except the final `Stream<B>` will contain | |
* only that values from each spawned streams that was | |
* emitted before next stream was spawned. | |
*/ | |
export const chainLatest: Chain = fn => stream => | |
sink => { | |
let spawnedDisposers = () => {} | |
const mainDisposer = stream(payload => { | |
spawnedDisposers() | |
spawnedDisposers = fn(payload)(sink) | |
}) | |
return () => { | |
spawnedDisposers() | |
mainDisposer() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://github.com/rpominov/basic-streams