Skip to content

Instantly share code, notes, and snippets.

@stephenplusplus
Last active August 29, 2015 14:27
Show Gist options
  • Select an option

  • Save stephenplusplus/b67bcaf570f33181d297 to your computer and use it in GitHub Desktop.

Select an option

Save stephenplusplus/b67bcaf570f33181d297 to your computer and use it in GitHub Desktop.
Measuring how different stream approaches perform

This is a working test implementation of recursively fetching and transforming data from the Google Cloud Storage API.

Biggest Concerns

  1. Is this so wrong, I should just stop?

  2. The API returns a JSON response that may contain either of the following:

1. An `errors` array.

2. An `items` array.

Is branching off two response streams to get a nextPageToken and errors the wrong way to snip out just a piece of the response? Is there a lot of overhead?

  1. Is calling dup.setReadable() after already setting the readable going to have any unwanted side effects?

I'm so thankful for any help and happy to answer questions about this!

Results

Local
> node .
Starting: makeRequestOld
Starting: makeRequestNew
Starting: makeRequestPaginated
----
1) makeRequestPaginated
Completed in: 11.535 seconds
Passed: false
----
2) makeRequestNew
Completed in: 11.537 seconds
Passed: false
----
3) makeRequestOld
Completed in: 11.543 seconds
Passed: false

So that gets us nowhere. Here's with memory logging at the time of each API request:

> node . old
Starting: makeRequestOld
0 mb
3 mb
4 mb
4 mb
4 mb
5 mb
5 mb
5 mb
6 mb
6 mb
7 mb
----
1) makeRequestOld
Completed in: 11.321 seconds
> node . new
Starting: makeRequestNew
0 mb
5 mb
6 mb
8 mb
9 mb
10 mb
11 mb
13 mb
14 mb
3 mb
4 mb
----
1) makeRequestNew
Completed in: 11.432 seconds
> node . page
Starting: makeRequestPaginated
0 mb
5 mb
6 mb
7 mb
9 mb
10 mb
11 mb
12 mb
13 mb
14 mb
3 mb
----
1) makeRequestPaginated
Completed in: 11.458 seconds
Network

And here's with a real API backend. Given the unpredictability of the network, the only interesting results to look at is how the different approaches use memory.

> node . old
Starting: makeRequestOld
0 mb
3 mb
4 mb
5 mb
6 mb
7 mb
8 mb
9 mb
10 mb
11 mb
11 mb
----
1) makeRequestOld
Completed in: 3.781 seconds
> node . new
Starting: makeRequestNew
0 mb
7 mb
11 mb
3 mb
7 mb
11 mb
14 mb
2 mb
6 mb
9 mb
13 mb
----
1) makeRequestNew
Completed in: 3.773 seconds
> node . page
Starting: makeRequestPaginated
0 mb
7 mb
11 mb
3 mb
7 mb
10 mb
14 mb
17 mb
5 mb
9 mb
12 mb
----
1) makeRequestPaginated
Completed in: 3.575 seconds
'use strict';
var async = require('async');
var continueStream = require('continue-stream');
var duplexify = require('duplexify');
var fs = require('fs');
var googleAuth = require('google-auto-auth')();
var JSONStream = require('jsonstream');
var pumpify = require('pumpify');
var request = require('request');
var through = require('through2');
var BASE_MEMORY;
var MAX_RUNS = 10;
var input = process.argv.splice(2) || [];
var series = input[input.length - 1] === '--series';
if (series) {
input.pop();
}
if (input.length === 0) {
input = ['old', 'new', 'page'];
}
if (input.length === 1) {
series = true;
}
var fnLookup = {
old: makeRequestOld,
new: makeRequestNew,
page: makeRequestPaginated
};
async[series ? 'mapSeries' : 'map'](input.map(function(alias) {
return fnLookup[alias];
}), run, onComplete);
function makeRequestOld(reqOpts) {
var runCount = 0;
var stream = through.obj();
makeRequest(reqOpts);
return stream;
function makeRequest(reqOpts) {
logMemoryUsage();
googleAuth.authorizeRequest(reqOpts, function(err, authorizedReqOpts) {
request(authorizedReqOpts, function(err, resp, body) {
body = JSON.parse(body);
var pageToken = body.nextPageToken;
body.items.map(toFileObject).forEach(stream.push.bind(stream));
if (pageToken && runCount++ < MAX_RUNS) {
reqOpts.qs.pageToken = pageToken;
makeRequest(reqOpts);
return;
}
stream.push(null);
});
});
}
}
var runCount = 0;
function makeRequestNew(reqOpts) {
var dup = duplexify.obj();
logMemoryUsage();
googleAuth.authorizeRequest(reqOpts, function(err, authorizedReqOpts) {
var req = request(authorizedReqOpts);
var toFileStream = JSONStream.parse('items.*', toFileObject);
var nextPageToken;
var paginateStream = through.obj();
paginateStream._flush = function (callback) {
if (nextPageToken && runCount++ < MAX_RUNS) {
reqOpts.qs.pageToken = nextPageToken;
dup.setReadable(makeRequestNew(reqOpts));
} else {
callback();
}
};
var pipe = pumpify.obj(req, toFileStream, paginateStream);
dup.setReadable(pipe);
parseJSONFromRequest(req, ['error.errors.*', 'nextPageToken'])
.once('error.errors.*', function(err) {
pipe.destroy(err);
})
.once('nextPageToken', function(token) {
nextPageToken = token;
});
});
return dup;
}
function makeRequestPaginated(reqOpts) {
var runCount = 0;
var nextPageToken;
function makeRequest(callback) {
runCount++;
if (runCount > 1 && !nextPageToken) {
callback();
return;
}
logMemoryUsage();
if (nextPageToken) {
reqOpts.qs.pageToken = nextPageToken;
nextPageToken = '';
}
googleAuth.authorizeRequest(reqOpts, function(err, authorizedReqOpts) {
var req = request(authorizedReqOpts);
var toFileStream = JSONStream.parse('items.*', toFileObject);
var pipe = pumpify.obj(req, toFileStream);
parseJSONFromRequest(req, ['error.errors.*', 'nextPageToken'])
.once('error.errors.*', function(err) {
pipe.destroy(err);
})
.once('nextPageToken', function(token) {
if (runCount <= MAX_RUNS) {
nextPageToken = token;
}
});
callback(null, pipe);
});
}
return continueStream.obj(makeRequest);
}
function toFileObject(fileMetadata) {
var file = {
name: fileMetadata.name,
metadata: fileMetadata
};
return file;
}
function parseJSONFromRequest(req, properties) {
var stream = through();
properties.forEach(function(property) {
req.once('response', function(res) {
res.pipe(JSONStream.parse(property))
.on('data', stream.emit.bind(stream, property));
});
});
return stream;
}
function run(fn, cb) {
BASE_MEMORY = getMemory();
console.log('Starting:', fn.name);
var startTime = Date.now();
var numBucketsReceived = 0;
var numBucketsExpected = 531; // buckets in my project (ikr)
var reqOpts = {
// uri: 'https://www.googleapis.com/storage/v1/b',
uri: 'http://localhost:8001',
qs: {
project: process.env.GCLOUD_TESTS_PROJECT_ID,
// this will cause an immediate failure.
// project: 'garbage-id',
// Force a nextPageToken to exist in the response.
maxResults: Math.floor(numBucketsExpected / 2)
}
};
fn(reqOpts)
.on('error', cb)
.pipe(through.obj(counter))
.pipe(JSONStream.stringify())
.pipe(fs.createWriteStream('__'))
.on('finish', function() {
cb(null, {
test: fn.name,
duration: (Date.now() - startTime) / 1000,
passed: numBucketsReceived === numBucketsExpected
});
});
function counter(obj, _, next) {
numBucketsReceived++;
next(null, obj);
}
}
function getMemory() {
return Math.round(process.memoryUsage().heapUsed / 1000000);
}
function logMemoryUsage() {
if (!series) {
return;
}
console.log(getMemory() - BASE_MEMORY + ' mb');
}
function onComplete(err, stats) {
if (err) {
throw err;
}
stats
.sort(function(a, b) {
if (a.duration < b.duration) {
return -1;
}
if (b.duration < a.duration) {
return 1;
}
return 0;
})
.forEach(function(stats, index) {
console.log([
'----',
index + 1 + ') ' + stats.test,
'Completed in: ' + stats.duration + ' seconds',
// 'Passed: ' + stats.passed
].join('\n'));
});
}
{
"name": "yeah-5945",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"async": "^1.4.2",
"continue-stream": "^2.0.0",
"duplexify": "^3.4.2",
"express": "^4.13.3",
"google-auto-auth": "^0.1.0",
"jsonstream": "^1.0.3",
"pumpify": "^1.3.3",
"request": "^2.60.0",
"through2": "^2.0.0"
}
}
'use strict';
var app = require('express')();
var apiResponse = JSON.stringify(require('./buckets.json'));
var hits = 0;
app.get('/', function(req, res) {
res.setHeader('Content-Type', 'application/json');
res.setHeader('Content-Length', apiResponse.length);
setTimeout(function() {
res.end(apiResponse);
}, 1000);
});
app.listen(8001);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment