Skip to content

Instantly share code, notes, and snippets.

@stephenplusplus
Last active July 3, 2019 01:15
Show Gist options
  • Save stephenplusplus/888373161f80a45516be64e911a84db5 to your computer and use it in GitHub Desktop.
Save stephenplusplus/888373161f80a45516be64e911a84db5 to your computer and use it in GitHub Desktop.
split-array-stream Potential Evolution
const {Readable, Transform} = require('stream');
// SplitArrayStream can be used two ways:
//
// - An intermediary stream that receives arrays and emits its items
// - A source stream that takes a function as an argument, that is expected to
// return an array each time it is called
//
// In both cases, an array is only asked for when the destination stream is
// ready.
class SplitArrayStream extends Transform {
constructor(getArrayFn) {
super({objectMode: true})
if (typeof getArrayFn === 'function') {
this.getArrayFn = getArrayFn
this._read = this._readFromFn.bind(this)
}
}
async _readFromFn() {
let consumerStreamReady = true
const array = await this.getArrayFn()
while (consumerStreamReady && array.length > 0) {
consumerStreamReady = this.push(array.shift())
}
if (consumerStreamReady && array.length > 0) {
setImmediate(() => this._read(array))
}
}
_transform(array, enc, next) {
let consumerStreamReady = true
while (consumerStreamReady && array.length > 0) {
consumerStreamReady = this.push(array.shift())
}
if (consumerStreamReady && array.length === 0) {
next()
} else {
setImmediate(() => this._transform(array, enc, next))
}
}
}
// For testing purposes, only returns 5 pages of results
// (2 buckets per page, 10 buckets total)
let numPagesReturned = 0
async function makeGoogleApiRequest() {
const simulatedApiResponse = {
kind: 'storage#listBuckets',
items: [
{
kind: 'storage#bucket',
id: `bucket-a-from-page-${numPagesReturned + 1}`,
},
{
kind: 'storage#bucket',
id: `bucket-b-from-page-${numPagesReturned + 1}`,
}
],
nextPageToken: '...',
}
if (numPagesReturned > 3) {
delete simulatedApiResponse.nextPageToken
}
numPagesReturned++
return Promise.resolve(simulatedApiResponse)
}
// Simulated "storage.getBuckets()" method
async function getBuckets(options) {
const reqOpts = {
method: 'GET',
uri: 'https://www.googleapis.com/storage/v1/buckets/list',
}
if (options.pageToken) {
reqOpts.qs = {pageToken: options.pageToken}
}
const apiResponse = await makeGoogleApiRequest(reqOpts)
const simulatedBucketObjects = apiResponse.items.map(bucket => {
return {
name: bucket.id,
metadata: bucket,
}
})
return Promise.resolve([simulatedBucketObjects, apiResponse])
}
function getBucketsStreamOptionA() {
let nextPageToken
const readStream = new Readable({
objectMode: true,
read: async function() {
const options = {autoPaginate: false}
if (nextPageToken) {
options.pageToken = nextPageToken
}
const [buckets, apiResponse] = await getBuckets(options)
nextPageToken = apiResponse.nextPageToken
this.push(buckets)
if (!nextPageToken) {
this.push(null)
}
}
})
return readStream.pipe(new SplitArrayStream())
}
function getBucketsStreamOptionB() {
let nextPageToken
const getBucketsAsStream = async () => {
const options = {autoPaginate: false}
if (nextPageToken) {
options.pageToken = nextPageToken
}
const [buckets, apiResponse] = await getBuckets(options)
nextPageToken = apiResponse.nextPageToken
if (!nextPageToken) {
buckets.push(null)
}
return Promise.resolve(buckets)
}
return new SplitArrayStream(getBucketsAsStream)
}
getBucketsStreamOptionA()
.on('data', bucket => {
console.log('Bucket name:', bucket.name)
})
getBucketsStreamOptionB()
.on('data', bucket => {
console.log('Bucket name:', bucket.name)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment