Last active
July 26, 2018 23:49
-
-
Save uhop/11e3da0346a816b2e8cf407e0c078d16 to your computer and use it in GitHub Desktop.
Merge two corresponding streams by keys
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const {Readable} = require('stream'); | |
const merge = (s1, s2) => { | |
s1.pause(); | |
s2.pause(); | |
let item1 = null, | |
item2 = null, | |
done = false; | |
const result = new Readable({ | |
objectMode: true, | |
read() { | |
if (item1 === null && item2 === null) { | |
s1.resume(); | |
s2.resume(); | |
} | |
} | |
}); | |
s1.on('data', item => { | |
if (item2 !== null) { | |
result.push({ | |
key: item.key, | |
value1: item.value, | |
value2: item2.value | |
}); | |
item2 = null; | |
s2.resume(); | |
} else { | |
item1 = item; | |
s1.pause(); | |
} | |
}); | |
s2.on('data', item => { | |
if (item1 !== null) { | |
result.push({ | |
key: item.key, | |
value1: item1.value, | |
value2: item.value | |
}); | |
item1 = null; | |
s1.resume(); | |
} else { | |
item2 = item; | |
s2.pause(); | |
} | |
}); | |
const finish = () => { | |
if (item1 === null && item2 === null && !done) { | |
result.push(null); | |
done = true; | |
} | |
}; | |
s1.on('end', finish); | |
s2.on('end', finish); | |
return result; | |
}; | |
// module.exports = merge; | |
// tests | |
const {Transform} = require('stream'); | |
class PassThrough extends Transform { | |
constructor(options) { | |
super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true})); | |
} | |
_transform(chunk, encoding, callback) { | |
this.push(chunk); | |
callback(null); | |
} | |
} | |
const s1 = new PassThrough(); | |
const s2 = new PassThrough(); | |
const merged = merge(s1, s2); | |
merged.on('data', data => console.log(data)); | |
merged.on('end', () => console.log('end')); | |
s1.push({key: 1, value: 'a'}); | |
s1.push({key: 2, value: 'b'}); | |
s1.push({key: 3, value: 'c'}); | |
s1.push(null); | |
s2.push({key: 1, value: 'A'}); | |
s2.push({key: 2, value: 'B'}); | |
s2.push({key: 3, value: 'C'}); | |
s2.push(null); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This gist was a foundation for https://github.com/uhop/stream-join available on npm.