Last active
July 3, 2019 01:15
-
-
Save stephenplusplus/888373161f80a45516be64e911a84db5 to your computer and use it in GitHub Desktop.
split-array-stream Potential Evolution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const {Readable, Transform} = require('stream'); | |
// SplitArrayStream can be used two ways: | |
// | |
// - An intermediary stream that receives arrays and emits its items | |
// - A source stream that takes a function as an argument, that is expected to | |
// return an array each time it is called | |
// | |
// In both cases, an array is only asked for when the destination stream is | |
// ready. | |
class SplitArrayStream extends Transform { | |
constructor(getArrayFn) { | |
super({objectMode: true}) | |
if (typeof getArrayFn === 'function') { | |
this.getArrayFn = getArrayFn | |
this._read = this._readFromFn.bind(this) | |
} | |
} | |
async _readFromFn() { | |
let consumerStreamReady = true | |
const array = await this.getArrayFn() | |
while (consumerStreamReady && array.length > 0) { | |
consumerStreamReady = this.push(array.shift()) | |
} | |
if (consumerStreamReady && array.length > 0) { | |
setImmediate(() => this._read(array)) | |
} | |
} | |
_transform(array, enc, next) { | |
let consumerStreamReady = true | |
while (consumerStreamReady && array.length > 0) { | |
consumerStreamReady = this.push(array.shift()) | |
} | |
if (consumerStreamReady && array.length === 0) { | |
next() | |
} else { | |
setImmediate(() => this._transform(array, enc, next)) | |
} | |
} | |
} | |
// For testing purposes, only returns 5 pages of results | |
// (2 buckets per page, 10 buckets total) | |
let numPagesReturned = 0 | |
async function makeGoogleApiRequest() { | |
const simulatedApiResponse = { | |
kind: 'storage#listBuckets', | |
items: [ | |
{ | |
kind: 'storage#bucket', | |
id: `bucket-a-from-page-${numPagesReturned + 1}`, | |
}, | |
{ | |
kind: 'storage#bucket', | |
id: `bucket-b-from-page-${numPagesReturned + 1}`, | |
} | |
], | |
nextPageToken: '...', | |
} | |
if (numPagesReturned > 3) { | |
delete simulatedApiResponse.nextPageToken | |
} | |
numPagesReturned++ | |
return Promise.resolve(simulatedApiResponse) | |
} | |
// Simulated "storage.getBuckets()" method | |
async function getBuckets(options) { | |
const reqOpts = { | |
method: 'GET', | |
uri: 'https://www.googleapis.com/storage/v1/buckets/list', | |
} | |
if (options.pageToken) { | |
reqOpts.qs = {pageToken: options.pageToken} | |
} | |
const apiResponse = await makeGoogleApiRequest(reqOpts) | |
const simulatedBucketObjects = apiResponse.items.map(bucket => { | |
return { | |
name: bucket.id, | |
metadata: bucket, | |
} | |
}) | |
return Promise.resolve([simulatedBucketObjects, apiResponse]) | |
} | |
function getBucketsStreamOptionA() { | |
let nextPageToken | |
const readStream = new Readable({ | |
objectMode: true, | |
read: async function() { | |
const options = {autoPaginate: false} | |
if (nextPageToken) { | |
options.pageToken = nextPageToken | |
} | |
const [buckets, apiResponse] = await getBuckets(options) | |
nextPageToken = apiResponse.nextPageToken | |
this.push(buckets) | |
if (!nextPageToken) { | |
this.push(null) | |
} | |
} | |
}) | |
return readStream.pipe(new SplitArrayStream()) | |
} | |
function getBucketsStreamOptionB() { | |
let nextPageToken | |
const getBucketsAsStream = async () => { | |
const options = {autoPaginate: false} | |
if (nextPageToken) { | |
options.pageToken = nextPageToken | |
} | |
const [buckets, apiResponse] = await getBuckets(options) | |
nextPageToken = apiResponse.nextPageToken | |
if (!nextPageToken) { | |
buckets.push(null) | |
} | |
return Promise.resolve(buckets) | |
} | |
return new SplitArrayStream(getBucketsAsStream) | |
} | |
getBucketsStreamOptionA() | |
.on('data', bucket => { | |
console.log('Bucket name:', bucket.name) | |
}) | |
getBucketsStreamOptionB() | |
.on('data', bucket => { | |
console.log('Bucket name:', bucket.name) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment