Created
August 4, 2012 02:02
-
-
Save garthk/3253513 to your computer and use it in GitHub Desktop.
Using bouncy and mux-demux to make a proxy server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var http = require('http'), | |
net = require('net'), | |
bouncy = require('bouncy'), | |
MuxDemux = require('mux-demux'); | |
var mux_server_port = 8192; | |
var trampoline_port = 8193; | |
var target_server_port = 8194; | |
var mux; | |
function log(arg) { | |
console.log(JSON.stringify(arg)); | |
} | |
// mux_server takes connections from mux_client via trampoline | |
var mux_server = http.createServer(function(req, res) { | |
log({ what: 'mux-request' }); | |
res.writeHead(403); | |
res.end("not expecting requests"); | |
}); | |
mux_server.listen(mux_server_port); | |
mux_server.on('connect', function(req, socket, head) { | |
log({ what: 'mux-connect' }); | |
socket.write('HTTP/1.1 200 Connection Established\r\n\r\n'); | |
mux = MuxDemux(); | |
socket.pipe(mux).pipe(socket); | |
}); | |
// trampoline takes connections from either mux_client or HTTP | |
// client, redirecting to mux_server on CONNECT or mux_client | |
// (via new mux stream) on other methods e.g. GET. | |
var trampoline = bouncy(function(req, bounce) { | |
log({ what: 'trampoline-request', method: req.method, path: req.url }); | |
if (req.method == 'CONNECT') { | |
log({ what: 'trampoline-redirect-to-mux-server' }); | |
bounce(mux_server_port); | |
} else { | |
log({ what: 'trampoline-redirect-via-mux-server' }); | |
var meta = { method: req.method, url: req.url, headers: req.headers }; | |
var stream = mux.createStream(meta); | |
bounce(stream); | |
} | |
}); | |
trampoline.listen(trampoline_port); | |
// I'd hoped that by setting the socket's encoding, the data events | |
// from the bounced stream would contain strings instead of Buffers | |
// and avoid the problem in mux-demux. No such luck: | |
trampoline.on('connect', function(socket) { | |
socket.setEncoding('utf8'); | |
}); | |
// target_server will be chosen by mux_client based on the | |
// request metadata bouncy supplied to the trampoline. For | |
// this example, we'll create a server ourselves. | |
var target_server = http.createServer(function(req, res) { | |
log({ what: 'target-request' }); | |
res.writeHead(200); | |
res.end("BURMA!"); // compulsory Python reference | |
}); | |
target_server.listen(target_server_port); | |
var mux_client_opts = { | |
host: 'localhost', | |
port: trampoline_port, | |
method: 'CONNECT', | |
path: 'compulsory?' | |
}; | |
// mux_client routes the original HTTP request to a target server. | |
var mux_client = http.request(mux_client_opts); | |
mux_client.on('connect', function(res, socket, head) { | |
log({ what: 'mux-client-connected' }); | |
var mux = MuxDemux(); | |
socket.pipe(mux).pipe(socket); | |
mux.on('connection', function(stream) { | |
log({ what: 'new-mux-channel', meta: stream.meta }); | |
var sock_opts = { host: 'localhost', port: target_server_port }; | |
var sock = net.connect(sock_opts, function() { | |
log({ what: 'mux-client-connected-to-target' }); | |
sock.pipe(stream).pipe(sock); | |
}); | |
}); | |
// Just for this example, we'll have mux_client make a request | |
// to the trampoline to exercise the whole chain once we establish | |
// the mux-client-to-mux-server connection. | |
log({ what: 'client-making-request-to-trampoline' }); | |
var opts = { | |
host: 'localhost', | |
port: trampoline_port, | |
method: 'GET', | |
path: '/fnord' // compulsory Illuminati reference | |
}; | |
var client_request = http.request(opts, function(res) { | |
log({ what: 'client-got-results-via-trampoline', | |
status: res.statusCode }); | |
res.setEncoding('utf8'); | |
var content = ""; | |
res.on('data', function(chunk) { content += chunk; }); | |
res.on('end', function() { | |
log({ what: 'client-finished-getting-results', | |
content: content }); | |
}); | |
}); | |
client_request.end(); | |
}); | |
log({ what: 'mux-client-opens-connection-to-mux-server-via-trampoline' }); | |
mux_client.end(); // kicks it all off |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"what":"mux-client-opens-connection-to-mux-server-via-trampoline"} | |
{"what":"trampoline-request","method":"CONNECT","path":"compulsory?"} | |
{"what":"trampoline-redirect-to-mux-server"} | |
{"what":"mux-connect"} | |
{"what":"mux-client-connected"} | |
{"what":"client-making-request-to-trampoline"} | |
{"what":"trampoline-request","method":"GET","path":"/fnord"} | |
{"what":"trampoline-redirect-via-mux-server"} | |
{"what":"new-mux-channel","meta":{"method":"GET","url":"/fnord","headers":{"host":"localhost:8193","connection":"keep-alive"}}} | |
{"what":"mux-client-connected-to-target"} | |
net.js:503 | |
throw new TypeError('First argument must be a buffer or a string.'); | |
^ | |
TypeError: First argument must be a buffer or a string. | |
at Socket.write (net.js:503:11) | |
at Stream.ondata (stream.js:38:26) | |
at Stream.EventEmitter.emit (events.js:88:17) | |
at Stream.<anonymous> (/Users/garth/hg/asupnode/node_modules/mux-demux/index.js:30:14) | |
at Stream.stream.write (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/node_modules/through/index.js:22:11) | |
at Stream.ondata (stream.js:38:26) | |
at Stream.EventEmitter.emit (events.js:88:17) | |
at Stream.es.parse (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/index.js:497:12) | |
at Stream.stream.write (/Users/garth/hg/asupnode/node_modules/mux-demux/node_modules/event-stream/node_modules/through/index.js:22:11) | |
at Stream.ondata (stream.js:38:26) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var net = require('net'), | |
MuxDemux = require('mux-demux'), | |
util = require('util'); | |
var destPort = 8192; | |
var dest = net.createServer(function(conn) { | |
console.log("destination server got connection"); | |
conn.write(new Buffer("fnord", 'utf8')); | |
console.log("wrote data"); | |
conn.end(); | |
console.log("called stream.end"); | |
console.log("mystery: why does it take so long?"); | |
}).listen(destPort, function() { | |
console.log("server bound"); | |
}); | |
var serverMux = MuxDemux(); | |
var clientMux = MuxDemux(); | |
clientMux.pipe(serverMux).pipe(clientMux); | |
serverMux.on('connection', function(stream) { | |
console.log("mux got connection"); | |
var conn_to_port = net.connect(destPort); | |
conn_to_port.on('connect', function() { | |
stream.pipe(conn_to_port).pipe(stream); | |
}); | |
}); | |
console.log("opening read stream..."); | |
clientMux.createReadStream('test').on('data', function(data) { | |
if (Buffer.isBuffer(data)) { | |
console.log("received buffer containing:", data.toString()); | |
} else if (typeof data === 'string') { | |
console.log("received string:", data); | |
} else if (typeof data === 'object') { | |
console.log(util.format( | |
"received non-buffer (%s) with keys: %s", | |
typeof data, | |
Object.keys(data))); | |
} else { | |
console.log("received unexpected", typeof data); | |
} | |
}).on('end', process.exit); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/index.js b/index.js | |
index 2e0d007..422dd0f 100644 | |
--- a/index.js | |
+++ b/index.js | |
@@ -26,9 +26,10 @@ function MuxDemux (opts) { | |
s.paused = false | |
if(p) s.emit('drain') | |
} | |
- else { | |
+ else if (event === 'bufdata') | |
+ s.emit('data', new Buffer(data[1], 'utf8')); | |
+ else | |
s.emit.apply(s, data) | |
- } | |
}) | |
function destroyAll (_err) { | |
@@ -59,7 +60,10 @@ function MuxDemux (opts) { | |
var s = es.through(function (data) { | |
if(!this.writable) | |
throw new Error('stream is not writable') | |
- md.emit('data', [s.id, 'data', data]) | |
+ if (Buffer.isBuffer(data)) | |
+ md.emit('data', [s.id, 'bufdata', data.toString('utf8')]) | |
+ else | |
+ md.emit('data', [s.id, 'data', data]) | |
}, function () { | |
md.emit('data', [s.id, 'end']) | |
if (this.readable && !opts.allowHalfOpen && !this.ended) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm getting
error
fired byreq
after that request hits my handler, but the first argument is an empty object.