Last active
August 18, 2022 21:38
-
-
Save Raynos/9fc98da9ec7414122648 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
concatMap takes an iter and returns one. | |
It takes a mapper of (value, push, done) | |
You may call `push` zero or more times with a value. | |
You must call `done` with either `Error` or `null` | |
You may not call `push` after `done` | |
Be wary that the last invocation of `mapper` will be with | |
the value `undefined`, i.e. the Finished token which allows | |
you to flush any accumulation | |
*/ | |
function concatMap(iter, mapper) { | |
var queue = []; | |
return { | |
next: next, | |
end: iter.end | |
}; | |
function next(cb) { | |
if (queue.length) { | |
var item = queue.shift(); | |
return process.nextTick(function () { | |
cb(null, item); | |
}); | |
} | |
iter.next(onValue); | |
function onValue(err, value) { | |
if (err) { | |
return cb(err); | |
} | |
mapper(value, push, done); | |
} | |
function push(value) { | |
queue.push(value); | |
} | |
function done(err) { | |
if (err) { | |
return cb(err); | |
} | |
if (queue.length === 0) { | |
return next(cb); | |
} | |
callback(null, queue.shift()); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = concatMap(sourceStream, function (chunk, push, done) { | |
if (chunk === undefined) { | |
return process.nextTick(done); | |
} | |
var values = []; | |
setTimeout(function () { | |
push(chunk.toUpperCase()); | |
}, 10); | |
setTimeout(function () { | |
push(chunk.toUpperCase()); | |
}, 20); | |
setTimeout(function () { | |
done(null); | |
}, 30); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment