Skip to content

Instantly share code, notes, and snippets.

@rubeniskov
Last active May 20, 2018 00:25
Show Gist options
  • Save rubeniskov/b12d60f586af754c8e3955ba6294289b to your computer and use it in GitHub Desktop.
Save rubeniskov/b12d60f586af754c8e3955ba6294289b to your computer and use it in GitHub Desktop.
pipe node streams with rxjs
const rxjs = require('rxjs'),
from = require('from2'),
through = require('through2');
const rxStreamDuplex = stream => {
if (stream.pause)
stream.pause();
return source => rxjs.Observable.create(observer => {
function onDataHandler(data) {
observer.next(data)
};
function onEndHandler() {
observer.complete()
};
function onErrorHandler(err) {
observer.error(err)
};
stream.addListener('data', onDataHandler);
stream.addListener('end', onEndHandler);
stream.addListener('error', onErrorHandler);
if (source) {
source.subscribe({
next: chunk => stream.write(chunk),
error: err => stream.emit('error', err),
complete: () => stream.end()
});
}
if(stream.resume)
stream.resume();
return () => {
stream.removeListener('data', onDataHandler);
stream.removeListener('end', onEndHandler);
stream.removeListener('error', onErrorHandler);
if (stream.destroy)
stream.destroy();
}
});
}
const rxStream = stream => rxStreamDuplex(stream)();
rxjs.Observable.prototype.pipe = (oldpipe => function(...args) {
return oldpipe.apply(this, args.map(o => o.write && o.pipe ? rxStreamDuplex(o) : o));
})(rxjs.Observable.prototype.pipe);
const randomValues = (count = 10) => {
return from((size, next) => {
process.nextTick(()=>{
next(
null, count--
? Buffer.from((~~ (Math.random() * 0xFFFFFFF)).toString(16) + '', 'utf8')
: null);
});
})
}
const prependTest = (text) => {
return through((chunk, enc, next) => {
next(null, Buffer.concat([
Buffer.from(text, 'utf8'),
chunk
]));
})
}
randomValues()
.pipe(prependTest('Random value with Node Stream pipeline '))
.on('data', d => console.log(d.toString()))
rxStream(randomValues())
.pipe(prependTest('Random value with RxJS pipeline '))
.subscribe(d => console.log(d.toString()));
/*****
Random value with Node Stream pipeline b72b110
Random value with RxJS pipeline 411117d
Random value with Node Stream pipeline e31eac2
Random value with RxJS pipeline d206325
Random value with Node Stream pipeline eb6465f
Random value with RxJS pipeline c5f06cd
Random value with Node Stream pipeline ef4f779
Random value with RxJS pipeline 93ff4ad
Random value with Node Stream pipeline a859adf
Random value with RxJS pipeline 22f3062
Random value with Node Stream pipeline 9ad1124
Random value with RxJS pipeline b1a2c56
Random value with Node Stream pipeline 25c3961
Random value with RxJS pipeline 72f230d
Random value with Node Stream pipeline ae2694b
Random value with RxJS pipeline aa63fb9
Random value with Node Stream pipeline a8dc1f3
Random value with RxJS pipeline 1ee4b04
Random value with Node Stream pipeline a66c721
Random value with RxJS pipeline 7675fdd
*****/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment