Created
March 30, 2016 13:11
-
-
Save briancavalier/1cdb301f25dc62813b6be9c78ec0c8b8 to your computer and use it in GitHub Desktop.
Stream proxy that can be told to imitate another stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { Stream, observe, periodic } from '../most' | |
| import defaultScheduler from '../lib/scheduler/defaultScheduler' | |
| const proxy = () => { | |
| const source = new Source() | |
| return { | |
| imitate: stream => imitateStream(stream, source), | |
| stream: new Stream(source) | |
| } | |
| } | |
| const imitateStream = ({ source }, sink) => { | |
| return source.run(sink, defaultScheduler) | |
| } | |
| class Source { | |
| constructor() { | |
| this.sink = undefined | |
| this.active = true | |
| } | |
| run(sink) { | |
| this.sink = sink | |
| } | |
| event(t, x) { | |
| this.ensureActive(); | |
| if(this.sink === undefined) { | |
| return; | |
| } | |
| this.sink.event(t, x); | |
| } | |
| end(t, x) { | |
| this.propagateAndDisable(this.sink.end, t, x); | |
| } | |
| error(t, e) { | |
| this.propagateAndDisable(this.sink.error, t, e); | |
| } | |
| propagateAndDisable(method, t, x) { | |
| this.ensureActive(); | |
| if(this.sink === void 0) { | |
| return; | |
| } | |
| this.active = false; | |
| const sink = this.sink; | |
| this.sink = undefined; | |
| method.call(sink, t, x); | |
| } | |
| ensureActive() { | |
| if(!this.active) { | |
| throw new Error('stream ended'); | |
| } | |
| } | |
| } | |
| const { imitate, stream } = proxy(); | |
| const original = periodic(100, 'a') | |
| observe(x => console.log('proxy', x), stream) | |
| observe(x => console.log('original', x), original) | |
| imitate(s) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment