Skip to content

Instantly share code, notes, and snippets.

@marekkalnik
Created December 15, 2015 14:50
Show Gist options
  • Save marekkalnik/e10bb1090380f5c458cf to your computer and use it in GitHub Desktop.
Save marekkalnik/e10bb1090380f5c458cf to your computer and use it in GitHub Desktop.
Enabling newrelic makes the stream send empty data
'use strict';
require('newrelic');
var express = require("express");
var app = express();
var _ = require('lodash');
var mysql = require('mysql');
var Stream = require('stream');
var util = require('util');
app.get('/', function(req, res){
var connection = mysql.createConnection({
host : 'localhost',
user : 'test',
password : 'test',
database: 'test'
});
var toJson = {};
toJson.write = function(model, index, cb) {
var transformedModel;
if(!model) transformedModel = '';
// Transform to JSON
if(model) {
try {
transformedModel = JSON.stringify(model);
} catch (e) {
return cb(e);
}
}
// Prefix with opening [
if (index === 0) { transformedModel = '['; }
// Prefix with comma after first model
if (index > 1) transformedModel = ',' + transformedModel;
cb(null, transformedModel);
};
toJson.end = function(cb) {
var suffix = ']';
cb(null, suffix);
};
var ModelStream = module.exports = function(transformation) {
// Use specified, or otherwise default, JSON transformation
this.transformation = transformation || Transformations.json;
// Reset write index
this.index = 0;
// Make stream writable
this.writable = true;
};
util.inherits(ModelStream, Stream);
/**
* Write to stream
*
* Extracts args to write and emits them as data events
*
* @param {Object} model
* @param {Function} cb
*/
ModelStream.prototype.write = function(model, cb) {
var self = this;
// Run transformation on this item
this.transformation.write(model, this.index, function writeToStream(err, transformedModel) {
// Increment index for next time
self.index++;
// Write transformed model to stream
self.emit('data', _.clone(transformedModel));
// Inform that we're finished
if(cb) return cb(err);
});
};
ModelStream.prototype.end = function(err, cb) {
var self = this;
if(err) {
this.emit('error', err.message);
if(cb) return cb(err);
return;
}
this.transformation.end(function(err, suffix) {
if(err) {
self.emit('error', err);
if(cb) return cb(err);
return;
}
// Emit suffix if specified
if(suffix) self.emit('data', suffix);
self.emit('end');
if(cb) return cb();
});
};
var query = 'select * from `user';
var stream = new ModelStream(toJson);
var dbStream = connection.query(query);
var cb = function() {
console.log(arguments);
connection.end();
};
// Handle error, an 'end' event will be emitted after this as well
dbStream.on('error', function(err) {
stream.end(err); // End stream
cb(err); // Close connection
});
// the field packets for the rows to follow
dbStream.on('fields', function(fields) {});
// Pausing the connnection is useful if your processing involves I/O
dbStream.on('result', function(row) {
connection.pause();
stream.write(row, function() {
connection.resume();
});
});
// all rows have been received
dbStream.on('end', function() {
stream.end(); // End stream
cb(null, 'end'); // Close connection
});
res.setHeader('Cache-Control', 'no-cache');
stream.pipe(res);
});
app.listen(1337);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment