Skip to content

Instantly share code, notes, and snippets.

@uhop
Last active July 26, 2018 23:49
Show Gist options
  • Save uhop/11e3da0346a816b2e8cf407e0c078d16 to your computer and use it in GitHub Desktop.
Save uhop/11e3da0346a816b2e8cf407e0c078d16 to your computer and use it in GitHub Desktop.
Merge two corresponding streams by keys
'use strict';
const {Readable} = require('stream');
const merge = (s1, s2) => {
s1.pause();
s2.pause();
let item1 = null,
item2 = null,
done = false;
const result = new Readable({
objectMode: true,
read() {
if (item1 === null && item2 === null) {
s1.resume();
s2.resume();
}
}
});
s1.on('data', item => {
if (item2 !== null) {
result.push({
key: item.key,
value1: item.value,
value2: item2.value
});
item2 = null;
s2.resume();
} else {
item1 = item;
s1.pause();
}
});
s2.on('data', item => {
if (item1 !== null) {
result.push({
key: item.key,
value1: item1.value,
value2: item.value
});
item1 = null;
s1.resume();
} else {
item2 = item;
s2.pause();
}
});
const finish = () => {
if (item1 === null && item2 === null && !done) {
result.push(null);
done = true;
}
};
s1.on('end', finish);
s2.on('end', finish);
return result;
};
// module.exports = merge;
// tests
const {Transform} = require('stream');
class PassThrough extends Transform {
constructor(options) {
super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
}
_transform(chunk, encoding, callback) {
this.push(chunk);
callback(null);
}
}
const s1 = new PassThrough();
const s2 = new PassThrough();
const merged = merge(s1, s2);
merged.on('data', data => console.log(data));
merged.on('end', () => console.log('end'));
s1.push({key: 1, value: 'a'});
s1.push({key: 2, value: 'b'});
s1.push({key: 3, value: 'c'});
s1.push(null);
s2.push({key: 1, value: 'A'});
s2.push({key: 2, value: 'B'});
s2.push({key: 3, value: 'C'});
s2.push(null);
@uhop
Copy link
Author

uhop commented Jul 26, 2018

This gist was a foundation for https://github.com/uhop/stream-join available on npm.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment