Last active
May 20, 2018 00:25
-
-
Save rubeniskov/b12d60f586af754c8e3955ba6294289b to your computer and use it in GitHub Desktop.
pipe node streams with rxjs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const rxjs = require('rxjs'), | |
from = require('from2'), | |
through = require('through2'); | |
const rxStreamDuplex = stream => { | |
if (stream.pause) | |
stream.pause(); | |
return source => rxjs.Observable.create(observer => { | |
function onDataHandler(data) { | |
observer.next(data) | |
}; | |
function onEndHandler() { | |
observer.complete() | |
}; | |
function onErrorHandler(err) { | |
observer.error(err) | |
}; | |
stream.addListener('data', onDataHandler); | |
stream.addListener('end', onEndHandler); | |
stream.addListener('error', onErrorHandler); | |
if (source) { | |
source.subscribe({ | |
next: chunk => stream.write(chunk), | |
error: err => stream.emit('error', err), | |
complete: () => stream.end() | |
}); | |
} | |
if(stream.resume) | |
stream.resume(); | |
return () => { | |
stream.removeListener('data', onDataHandler); | |
stream.removeListener('end', onEndHandler); | |
stream.removeListener('error', onErrorHandler); | |
if (stream.destroy) | |
stream.destroy(); | |
} | |
}); | |
} | |
const rxStream = stream => rxStreamDuplex(stream)(); | |
rxjs.Observable.prototype.pipe = (oldpipe => function(...args) { | |
return oldpipe.apply(this, args.map(o => o.write && o.pipe ? rxStreamDuplex(o) : o)); | |
})(rxjs.Observable.prototype.pipe); | |
const randomValues = (count = 10) => { | |
return from((size, next) => { | |
process.nextTick(()=>{ | |
next( | |
null, count-- | |
? Buffer.from((~~ (Math.random() * 0xFFFFFFF)).toString(16) + '', 'utf8') | |
: null); | |
}); | |
}) | |
} | |
const prependTest = (text) => { | |
return through((chunk, enc, next) => { | |
next(null, Buffer.concat([ | |
Buffer.from(text, 'utf8'), | |
chunk | |
])); | |
}) | |
} | |
randomValues() | |
.pipe(prependTest('Random value with Node Stream pipeline ')) | |
.on('data', d => console.log(d.toString())) | |
rxStream(randomValues()) | |
.pipe(prependTest('Random value with RxJS pipeline ')) | |
.subscribe(d => console.log(d.toString())); | |
/***** | |
Random value with Node Stream pipeline b72b110 | |
Random value with RxJS pipeline 411117d | |
Random value with Node Stream pipeline e31eac2 | |
Random value with RxJS pipeline d206325 | |
Random value with Node Stream pipeline eb6465f | |
Random value with RxJS pipeline c5f06cd | |
Random value with Node Stream pipeline ef4f779 | |
Random value with RxJS pipeline 93ff4ad | |
Random value with Node Stream pipeline a859adf | |
Random value with RxJS pipeline 22f3062 | |
Random value with Node Stream pipeline 9ad1124 | |
Random value with RxJS pipeline b1a2c56 | |
Random value with Node Stream pipeline 25c3961 | |
Random value with RxJS pipeline 72f230d | |
Random value with Node Stream pipeline ae2694b | |
Random value with RxJS pipeline aa63fb9 | |
Random value with Node Stream pipeline a8dc1f3 | |
Random value with RxJS pipeline 1ee4b04 | |
Random value with Node Stream pipeline a66c721 | |
Random value with RxJS pipeline 7675fdd | |
*****/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment