Last active
August 12, 2016 09:43
-
-
Save axefrog/61c7f9e3096520bd1f0d0e7e5ecccf23 to your computer and use it in GitHub Desktop.
A multi-pass scan operator for most.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {sample} from '@most/sample'; | |
import {proxy as makeProxy} from 'most-proxy'; | |
/** | |
* FlatScan is a multi-pass accumulation operator. It takes a first-stage | |
* initiator function and an initial state value, and returns a function that | |
* takes a stream of values and returns a stream of cumulatively-updated values. | |
* | |
* For each input value received from the source stream, the initiator function | |
* is called with the current accumulated state, the input value, and an | |
* optional stream of state values fed back from flatScan's second-stage output. | |
* The initiator function must return a stream of zero or more second-stage | |
* transformation functions that take the latest accumulated state value and | |
* return a new state value. | |
* | |
* The value emitted by the second-stage transformation functions is sampled as | |
* it is emitted, and retained internally to be passed to future invocations of | |
* both the initiator and transformation functions. | |
* | |
* The initial value is emitted immediately upon observation. | |
* | |
* flatScan :: (f, b) -> (a$ -> b$) | |
* f :: (b, a, b$) -> g$ | |
* g :: b -> b | |
*/ | |
function proxy(initial) { | |
const {attach, stream} = makeProxy(); | |
return [stream.startWith(initial), attach]; | |
} | |
export default function flatScan(f, initial) { | |
return stream => { | |
const [proxyState$, fulfill] = proxy(initial); | |
const initialize = (x, state) => f(state, x, proxyState$); | |
const state$ = sample(initialize, stream, proxyState$).debug('init') | |
.join() | |
.thru(update$ => sample((update, state) => update(state), update$, proxyState$)) | |
.startWith(initial) | |
.multicast(); | |
fulfill(state$); | |
return proxyState$; | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment