Skip to content

Instantly share code, notes, and snippets.

@staltz
Created December 7, 2017 20:04
Show Gist options
  • Save staltz/28010afa90dc4912beda6d4a3aae6aa1 to your computer and use it in GitHub Desktop.
Save staltz/28010afa90dc4912beda6d4a3aae6aa1 to your computer and use it in GitHub Desktop.
function xstreamToPullStream<T>(stream: Stream<T>): Readable<T> {
let hasBufferVal: boolean;
let bufferVal: T;
let bufferEndOrErr: any;
let callback: Callback<T>;
const listener: Listener<T> = {
next: t => {
console.log('pull stream Listener for xstream', t);
if (callback) {
console.log('pull stream direct', t);
callback(null, t);
} else {
hasBufferVal = true;
bufferVal = t;
}
},
error: err => {
if (callback) {
callback(err);
} else {
bufferEndOrErr = err;
}
},
complete: () => {
if (callback) {
callback(true);
} else {
bufferEndOrErr = true;
}
},
};
stream.addListener(listener);
return (endOrErr: any, cb: Callback<T>) => {
callback = cb;
if (endOrErr) {
stream.removeListener(listener);
return cb(endOrErr);
}
if (bufferEndOrErr) {
return cb(bufferEndOrErr);
}
if (hasBufferVal) {
console.log('pull stream buffered', bufferVal);
hasBufferVal = false;
return cb(null, bufferVal);
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment