Skip to content

Instantly share code, notes, and snippets.

@Raynos
Last active August 18, 2022 21:38
Show Gist options
  • Save Raynos/9fc98da9ec7414122648 to your computer and use it in GitHub Desktop.
Save Raynos/9fc98da9ec7414122648 to your computer and use it in GitHub Desktop.
/*
concatMap takes an iter and returns one.
It takes a mapper of (value, push, done)
You may call `push` zero or more times with a value.
You must call `done` with either `Error` or `null`
You may not call `push` after `done`
Be wary that the last invocation of `mapper` will be with
the value `undefined`, i.e. the Finished token which allows
you to flush any accumulation
*/
function concatMap(iter, mapper) {
var queue = [];
return {
next: next,
end: iter.end
};
function next(cb) {
if (queue.length) {
var item = queue.shift();
return process.nextTick(function () {
cb(null, item);
});
}
iter.next(onValue);
function onValue(err, value) {
if (err) {
return cb(err);
}
mapper(value, push, done);
}
function push(value) {
queue.push(value);
}
function done(err) {
if (err) {
return cb(err);
}
if (queue.length === 0) {
return next(cb);
}
callback(null, queue.shift());
}
}
}
var stream = concatMap(sourceStream, function (chunk, push, done) {
if (chunk === undefined) {
return process.nextTick(done);
}
var values = [];
setTimeout(function () {
push(chunk.toUpperCase());
}, 10);
setTimeout(function () {
push(chunk.toUpperCase());
}, 20);
setTimeout(function () {
done(null);
}, 30);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment