Created
December 15, 2015 14:50
-
-
Save marekkalnik/e10bb1090380f5c458cf to your computer and use it in GitHub Desktop.
Enabling newrelic makes the stream send empty data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
require('newrelic'); | |
var express = require("express"); | |
var app = express(); | |
var _ = require('lodash'); | |
var mysql = require('mysql'); | |
var Stream = require('stream'); | |
var util = require('util'); | |
app.get('/', function(req, res){ | |
var connection = mysql.createConnection({ | |
host : 'localhost', | |
user : 'test', | |
password : 'test', | |
database: 'test' | |
}); | |
var toJson = {}; | |
toJson.write = function(model, index, cb) { | |
var transformedModel; | |
if(!model) transformedModel = ''; | |
// Transform to JSON | |
if(model) { | |
try { | |
transformedModel = JSON.stringify(model); | |
} catch (e) { | |
return cb(e); | |
} | |
} | |
// Prefix with opening [ | |
if (index === 0) { transformedModel = '['; } | |
// Prefix with comma after first model | |
if (index > 1) transformedModel = ',' + transformedModel; | |
cb(null, transformedModel); | |
}; | |
toJson.end = function(cb) { | |
var suffix = ']'; | |
cb(null, suffix); | |
}; | |
var ModelStream = module.exports = function(transformation) { | |
// Use specified, or otherwise default, JSON transformation | |
this.transformation = transformation || Transformations.json; | |
// Reset write index | |
this.index = 0; | |
// Make stream writable | |
this.writable = true; | |
}; | |
util.inherits(ModelStream, Stream); | |
/** | |
* Write to stream | |
* | |
* Extracts args to write and emits them as data events | |
* | |
* @param {Object} model | |
* @param {Function} cb | |
*/ | |
ModelStream.prototype.write = function(model, cb) { | |
var self = this; | |
// Run transformation on this item | |
this.transformation.write(model, this.index, function writeToStream(err, transformedModel) { | |
// Increment index for next time | |
self.index++; | |
// Write transformed model to stream | |
self.emit('data', _.clone(transformedModel)); | |
// Inform that we're finished | |
if(cb) return cb(err); | |
}); | |
}; | |
ModelStream.prototype.end = function(err, cb) { | |
var self = this; | |
if(err) { | |
this.emit('error', err.message); | |
if(cb) return cb(err); | |
return; | |
} | |
this.transformation.end(function(err, suffix) { | |
if(err) { | |
self.emit('error', err); | |
if(cb) return cb(err); | |
return; | |
} | |
// Emit suffix if specified | |
if(suffix) self.emit('data', suffix); | |
self.emit('end'); | |
if(cb) return cb(); | |
}); | |
}; | |
var query = 'select * from `user'; | |
var stream = new ModelStream(toJson); | |
var dbStream = connection.query(query); | |
var cb = function() { | |
console.log(arguments); | |
connection.end(); | |
}; | |
// Handle error, an 'end' event will be emitted after this as well | |
dbStream.on('error', function(err) { | |
stream.end(err); // End stream | |
cb(err); // Close connection | |
}); | |
// the field packets for the rows to follow | |
dbStream.on('fields', function(fields) {}); | |
// Pausing the connnection is useful if your processing involves I/O | |
dbStream.on('result', function(row) { | |
connection.pause(); | |
stream.write(row, function() { | |
connection.resume(); | |
}); | |
}); | |
// all rows have been received | |
dbStream.on('end', function() { | |
stream.end(); // End stream | |
cb(null, 'end'); // Close connection | |
}); | |
res.setHeader('Cache-Control', 'no-cache'); | |
stream.pipe(res); | |
}); | |
app.listen(1337); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment