Skip to content

Instantly share code, notes, and snippets.

@myndzi
Last active August 29, 2015 14:07
Show Gist options
  • Save myndzi/ff7252f15e44bd7b8234 to your computer and use it in GitHub Desktop.
Save myndzi/ff7252f15e44bd7b8234 to your computer and use it in GitHub Desktop.
Detect stream compression type and build json object on request object
//load modules
//var app = require('express')();
var http = require('http');
//var server = new http.Server();
var zlib = require('zlib');
var JsonLoader = require('./index');
var server = http.createServer(function (req, res) {
function next(err) {
if (err) {
console.log(err.stack);
} else {
console.log('%s:', req.type, req.body);
res.end();
}
};
var abort = function () {
next(new Error('Stream ended without any data received'));
};
req.once('error', next);
req.once('end', abort);
req.once('data', function (chunk) {
// unbind req events (to be replaced with events on destination streams)
req.removeListener('end', abort);
req.removeListener('error', next);
req.type = 'uncompressed';
var jl = new JsonLoader();
jl.on('error', next);
jl.on('data', function (obj) {
req.body = obj;
});
jl.on('end', next);
// 0x7B === '{'
if (chunk[0] === 0x7B) {
jl.write(chunk);
req.pipe(jl);
return;
}
// assume compressed
var header = chunk.readUInt16BE(0),
decompress;
if (header === 0x1F8B) {
// gzip header [ID1][ID2] = [1F][8B]
req.type = 'gunzip';
decompress = zlib.createGunzip();
} else if (header & 0x0800 && header % 31 === 0) {
// zlib header [CMF][FLG] = [????1000][xxxxx???] for deflate
// xxxxx = check bit; header valid if MSB 16 bit value is divisible by 31
req.type = 'deflate';
decompress = zlib.createInflate();
} else {
// assume raw deflate
req.type = 'raw deflate';
decompress = zlib.createInflateRaw();
}
decompress.pipe(jl);
decompress.write(chunk);
req.pipe(decompress);
});
});
var reqs = 0;
//start server
server.listen(3000, function () {
console.log('Express started on port 3000');
test();
test(zlib.gzip);
test(zlib.deflate);
test(zlib.deflateRaw);
});
function test(compress) {
reqs++;
var req = http.request({
method: 'POST',
hostname: 'localhost',
port: 3000,
headers: {
'Connection': 'close'
}
}, function (res) {
if (--reqs === 0) {
server.close();
}
});
var data = JSON.stringify({ foo: 'bar' });
if (!compress) {
return req.end(data);
}
compress(data, function (err, compressed) {
req.end(compressed);
});
}
'use strict';
var Transform = require('stream').Transform,
inherits = require('util').inherits;
var clarinet = require('clarinet');
var Deque = require('double-ended-queue');
module.exports = JsonLoader;
function JsonLoader() {
Transform.call(this);
this._readableState.objectMode = true;
this.parser = this.initParser();
this.keys = new Deque(10);
this.vals = new Deque(10);
}
inherits(JsonLoader, Transform);
JsonLoader.prototype._transform = function (chunk, encoding, callback) {
this.parser.write(chunk.toString());
callback();
};
JsonLoader.prototype._flush = function (callback) {
this.parser.close();
this._end();
callback();
};
JsonLoader.prototype._onerror = function (e) {
this.emit('error', e);
this._end();
};
JsonLoader.prototype._onvalue = function (v) {
if (this.vals.length === 0) {
this.push(v);
this.keys.clear();
this.vals.clear();
} else {
var parent = this.vals.peekBack();
if (Array.isArray(parent)) {
parent.push(v);
} else {
parent[this.keys.pop()] = v;
}
}
};
JsonLoader.prototype._onopenobject = function (key) {
this.vals.push({ });
this.keys.push(key);
};
JsonLoader.prototype._onkey = function (key) {
this.keys.push(key);
};
JsonLoader.prototype._onopenarray = function () {
this.vals.push([ ]);
};
JsonLoader.prototype._oncloseobject =
JsonLoader.prototype._onclosearray = function () {
this._onvalue(this.vals.pop());
};
JsonLoader.prototype._onend = function () {
var val = this.value;
this._end();
};
JsonLoader.prototype._end = function () {
this.keys = null;
this.vals = null;
this.ended = true;
this.push(null);
};
JsonLoader.prototype.initParser = function () {
var parser = clarinet.parser();
parser.onerror = this._onerror.bind(this);
parser.onvalue = this._onvalue.bind(this);
parser.onopenobject = this._onopenobject.bind(this);
parser.onkey = this._onkey.bind(this);
parser.oncloseobject = this._oncloseobject.bind(this);
parser.onopenarray = this._onopenarray.bind(this);
parser.onclosearray = this._onclosearray.bind(this);
parser.onend = this._onend.bind(this);
return parser;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment