Skip to content

Instantly share code, notes, and snippets.

@stephenplusplus
Last active May 24, 2018 21:34
Show Gist options
  • Select an option

  • Save stephenplusplus/891613c627c1572e05972805efc0a646 to your computer and use it in GitHub Desktop.

Select an option

Save stephenplusplus/891613c627c1572e05972805efc0a646 to your computer and use it in GitHub Desktop.
function partialResultStream(requestFn, options) {
var lastResumeToken;
var activeRequestStream;
options = extend({toJSON: false}, options);
function makeRequest() {
requestFn(lastResumeToken).pipe(userStream)
}
var rowChunks = [];
var metadata;
var pendingRowValues;
var userStream = through.obj(function(row, _, next) {
var formattedRows = [];
if (row.metadata) {
metadata = row.metadata;
}
if (pendingRowValues) {
row.values = pendingRowValues.concat(row.values);
pendingRowValues = null;
}
if (row.chunkedValue) {
rowChunks.push(row);
next();
return;
}
if (is.empty(row.values)) {
next();
return;
}
if (rowChunks.length > 0) {
// Done getting all the chunks. Put them together.
var builder = new RowBuilder(metadata, rowChunks.concat(row));
formattedRows = formattedRows.concat(builder.toJSON());
rowChunks.length = 0;
} else {
var numExtraFields = row.values.length % metadata.rowType.fields.length;
if (numExtraFields > 0) {
pendingRowValues = row.values.splice(-numExtraFields);
}
var formattedRow = partialResultStream.formatRow_(metadata, row);
var multipleRows = is.array(formattedRow[0]);
if (multipleRows) {
formattedRows = formattedRows.concat(formattedRow);
} else {
formattedRows.push(formattedRow);
}
}
if (options.json) {
formattedRows = formattedRows.map(exec('toJSON', options.jsonOptions));
}
split(formattedRows, userStream).then(() => next());
});
userStream.abort = function() {
if (activeRequestStream) {
activeRequestStream.abort();
}
};
setImmediate(makeRequest)
return userStream
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment