Skip to content

Instantly share code, notes, and snippets.

@briancavalier
Created March 30, 2016 13:11
Show Gist options
  • Select an option

  • Save briancavalier/1cdb301f25dc62813b6be9c78ec0c8b8 to your computer and use it in GitHub Desktop.

Select an option

Save briancavalier/1cdb301f25dc62813b6be9c78ec0c8b8 to your computer and use it in GitHub Desktop.
Stream proxy that can be told to imitate another stream
import { Stream, observe, periodic } from '../most'
import defaultScheduler from '../lib/scheduler/defaultScheduler'
const proxy = () => {
const source = new Source()
return {
imitate: stream => imitateStream(stream, source),
stream: new Stream(source)
}
}
const imitateStream = ({ source }, sink) => {
return source.run(sink, defaultScheduler)
}
class Source {
constructor() {
this.sink = undefined
this.active = true
}
run(sink) {
this.sink = sink
}
event(t, x) {
this.ensureActive();
if(this.sink === undefined) {
return;
}
this.sink.event(t, x);
}
end(t, x) {
this.propagateAndDisable(this.sink.end, t, x);
}
error(t, e) {
this.propagateAndDisable(this.sink.error, t, e);
}
propagateAndDisable(method, t, x) {
this.ensureActive();
if(this.sink === void 0) {
return;
}
this.active = false;
const sink = this.sink;
this.sink = undefined;
method.call(sink, t, x);
}
ensureActive() {
if(!this.active) {
throw new Error('stream ended');
}
}
}
const { imitate, stream } = proxy();
const original = periodic(100, 'a')
observe(x => console.log('proxy', x), stream)
observe(x => console.log('original', x), original)
imitate(s)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment