Skip to content

Instantly share code, notes, and snippets.

@stellaraccident
Created March 21, 2011 20:05
Show Gist options
  • Save stellaraccident/880102 to your computer and use it in GitHub Desktop.
Save stellaraccident/880102 to your computer and use it in GitHub Desktop.
Bulletproof node coding snippets
request: function(options) {
var future=new Future();
var req=http.request(options, function(res) {
var text='';
res.setEncoding('utf8');
res.on('data', function(chunk) {
text+=chunk;
});
res.on('end', Block.guard(function() {
future.resolve(new CouchResponse(res, text));
}));
res.on('error', Block.errorHandler());
});
req.on('error', Block.errorHandler());
req.end();
return future;
}
function handleUserAgent(req, res, next) {
return Block.begin(process, next);
function process() {
return jsonifyRequest(req).force(withRequest);
}
function withRequest(requestObj) {
var r=validators.UserAgentRecord(requestObj, {fix:true});
if (!r.valid) {
res.writeHead(400);
return res.end('Invalid request object: ' + r.reason);
}
var uar=r.object;
if (uar.token) {
// Verify
//return handler.verifyUserAgent(uar);
throw new Error('verifyUserAgent not yet implemented');
} else {
// Create
uar.token=null;
uar.type='auth'; // TODO: Maybe support unauth in the future?
return handler.createUserAgent(uar).force(withUserAgent);
}
}
function withUserAgent(userAgent) {
var r=validators.UserAgentRecord(userAgent, {fix:true});
return respondJson(r.object, res);
}
}
/**
* Block class is used for routing errors to higher level logic.
*/
function Block(errback) {
this._parent=Block.current;
this._errback=errback;
}
Block.current=null;
/**
* Wrap a function such that any exceptions it generates
* are sent to the error callback of the Block that is active
* at the time of the call to guard(). If no Block
* is active, just returns the function.
*
* Example: stream.on('end', Block.guard(function() { ... }));
*/
Block.guard=function(f) {
if (this.current) return this.current.guard(f);
else return f;
};
/**
* Begins a new Block with two callback functions. The first
* is the main part of the block (think 'try body'), the
* second is the rescue function/error callback (think 'catch').
* The terminology follows Ruby for no other reason than that
* Block, begin and rescue describe an exception handling
* paradigm and are not reserved words in JavaScript.
*/
Block.begin=function(block, rescue) {
var ec=new Block(rescue);
return ec.trap(block);
};
/**
* Returns a function(err) that can be invoked at any time to raise
* an exception against the now current block (or the current context
* if no current). Errors are only raised if the err argument is true
* so this can be used in both error callbacks and error events.
*
* Example: request.on('error', Block.errorHandler())
*/
Block.errorHandler=function() {
// Capture the now current Block for later
var current=this.current;
return function(err) {
if (!err) return;
if (current) return current.raise(err);
else throw err;
};
};
/**
* Raises an exception on the Block. If the block has an
* error callback, it is given the exception. Otherwise,
* raise(...) is called on the parent block. If there is
* no parent, the exception is simply raised.
* Any nested exceptions from error callbacks will be raised
* on the block's parent.
*/
Block.prototype.raise=function(err) {
if (this._errback) {
try {
this._errback(err);
} catch (nestedE) {
if (this._parent) this._parent.raise(nestedE);
else throw nestedE;
}
} else {
if (this._parent) this._parent.raise(err);
else throw(err);
}
};
/**
* Executes a callback in the context of this block. Any
* errors will be passed to this Block's raise() method.
* Returns the value of the callback or undefined on error.
*/
Block.prototype.trap=function(callback) {
var origCurrent=Block.current;
Block.current=this;
try {
var ret=callback();
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
this.raise(e);
}
};
/**
* Wraps a function and returns a function that routes
* errors to this block. This is similar to trap but
* returns a new function instead of invoking the callback
* immediately.
*/
Block.prototype.guard=function(f) {
if (f.__guarded__) return f;
var self=this;
var wrapped=function() {
var origCurrent=Block.current;
Block.current=self;
try {
var ret=f.apply(this, arguments);
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
self.raise(e);
}
};
wrapped.__guarded__=true;
return wrapped;
};
/**
* Library of control flow utilities. There are lots of promise libraries on node
* and to disambiguate what I am using for my personal style from those, I chose the
* (near) synonym "future". I actually enjoy most of the callback style of node
* and don't want to hide it too much, so this library attempts to do very little
* magic, relying instead on explicit intent.
*/
function sliceArray(ary, begin, end) {
return Array.prototype.slice.call(ary, begin, end);
}
/**
* Block class is used for routing errors to higher level logic.
*/
function Block(errback) {
this._parent=Block.current;
this._errback=errback;
}
Block.current=null;
/**
* Wrap a function such that any exceptions it generates
* are sent to the error callback of the Block that is active
* at the time of the call to guard(). If no Block
* is active, just returns the function.
*
* Example: stream.on('end', Block.guard(function() { ... }));
*/
Block.guard=function(f) {
if (this.current) return this.current.guard(f);
else return f;
};
/**
* Begins a new Block with two callback functions. The first
* is the main part of the block (think 'try body'), the
* second is the rescue function/error callback (think 'catch').
* The terminology follows Ruby for no other reason than that
* Block, begin and rescue describe an exception handling
* paradigm and are not reserved words in JavaScript.
*/
Block.begin=function(block, rescue) {
var ec=new Block(rescue);
return ec.trap(block);
};
/**
* Returns a function(err) that can be invoked at any time to raise
* an exception against the now current block (or the current context
* if no current). Errors are only raised if the err argument is true
* so this can be used in both error callbacks and error events.
*
* Example: request.on('error', Block.errorHandler())
*/
Block.errorHandler=function() {
// Capture the now current Block for later
var current=this.current;
return function(err) {
if (!err) return;
if (current) return current.raise(err);
else throw err;
};
};
/**
* Raises an exception on the Block. If the block has an
* error callback, it is given the exception. Otherwise,
* raise(...) is called on the parent block. If there is
* no parent, the exception is simply raised.
* Any nested exceptions from error callbacks will be raised
* on the block's parent.
*/
Block.prototype.raise=function(err) {
if (this._errback) {
try {
this._errback(err);
} catch (nestedE) {
if (this._parent) this._parent.raise(nestedE);
else throw nestedE;
}
} else {
if (this._parent) this._parent.raise(err);
else throw(err);
}
};
/**
* Executes a callback in the context of this block. Any
* errors will be passed to this Block's raise() method.
* Returns the value of the callback or undefined on error.
*/
Block.prototype.trap=function(callback) {
var origCurrent=Block.current;
Block.current=this;
try {
var ret=callback();
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
this.raise(e);
}
};
/**
* Wraps a function and returns a function that routes
* errors to this block. This is similar to trap but
* returns a new function instead of invoking the callback
* immediately.
*/
Block.prototype.guard=function(f) {
if (f.__guarded__) return f;
var self=this;
var wrapped=function() {
var origCurrent=Block.current;
Block.current=self;
try {
var ret=f.apply(this, arguments);
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
self.raise(e);
}
};
wrapped.__guarded__=true;
return wrapped;
};
/**
* A Future class as per the literature on the topic.
* The two main operations are force() and resolve().
*/
function Future(resolution) {
this._resolved=false;
this._errored=false;
this._resolution=null;
this._pending=null;
if (arguments.length>0) {
// Create a resolved future
this.resolve(resolution);
}
}
/**
* Cast an arbitrary value to a Future. If already a Future,
* just return it. Otherwise, return a new Future resolved
* with the value.
*/
Future.cast=function(futureOrLiteral) {
if (futureOrLiteral instanceof Future) return futureOrLiteral;
else return new Future(futureOrLiteral);
};
Future.prototype={};
/**
* Invokes the callback(result) upon resolution of the future.
* Return true if force executed immediately, false if pended.
*
* If the callback is pended, it is wrapped with Block.guard
* so that any exceptions it throws are routed to the Block
* in effect at the time of the call to force().
*/
Future.prototype.force=function(callback) {
if (this._resolved) {
if (callback) callback(this._resolution);
return true;
} else if (callback) {
// Pend it.
var pended=Block.guard(callback);
if (!this._pending) this._pending=[pended];
else this._pending.push(pended);
return false;
} else {
return false;
}
};
/**
* Resolves the future. Any pended force callbacks are
* executed immediately. Any future calls to force() will
* invoke their callbacks immediately.
*/
Future.prototype.resolve=function(resolution) {
if (this._resolved) {
throw new Error('Logic error. Future resolved multiple times.');
}
this._resolved=true;
this._resolution=resolution;
if (this._pending) {
this._pending.forEach(function(pended) {
pended(resolution);
});
}
};
/**
* Return a new Future whose resolution is dependent on
* the resolution of this future. When this future is
* resolved, the transformer callback will be invoked with
* its resolution and the callback result will become the
* resolution of the new Future returned by this method.
*/
Future.prototype.chain=function(transformer) {
var chained=new Future();
this.force(function(resolution) {
chained.resolve(transformer(resolution));
});
return chained;
};
/**
* Forward the resolution from this future to another future.
* This future is forced and the resolution is resolved on
* the passed future.
*/
Future.prototype.forward=function(otherFuture) {
this.force(function(resolution) {
otherFuture.resolve(resolution);
});
};
// -- exports
exports.Block=Block;
exports.Future=Future;
/**
* Add or update an object in the session and invoke
* callback(err) when complete. updates is an array
* of {objectId:..., value:...}
*/
updateObjects: function(updates, callback) {
if (!Array.isArray(updates)) return callback(new Error('updates must be an array'));
var self=this, cn=self.cn;
// Initialize the commands
var commands=cn.multi(),
publishChannels={};
// First grab a new txid
return self._nextTxId(withTransactionId);
// -- callbacks
function withTransactionId(err, txid) {
if (err) return callback(err);
updateRest(0, txid);
}
function updateRest(index, txid) {
if (index>=updates.length) {
// Termination condition. Commit the commands.
return commitCommands(txid);
}
// Validate the record
var updateRecord=updates[index],
objectId=updateRecord.objectId,
object=updateRecord.value;
if (!objectIdIsValid(objectId)) return callback(new Error('Illegal ObjectId: "' + objectId + '"'));
if (object===null || 'object'!==typeof(object)) {
return callback(new Error('Illegal object value "' + object + '"'));
}
// Get it as json
var objectJson;
try {
objectJson=JSON.stringify(object);
} catch (e) {
return callback(e);
}
// Queue commands for updating this session
commands.hset(self._contentsKey(), objectId, objectJson); // Add to contents hash
commands.zadd(self._logKey(), txid, objectId); // Add to the txlog
commands.zrem(self._expungeKey(), objectId); // Undelete previous
//commands.publish(self._updateChannel(), self._updateMessage(txid)); // Publish update
publishChannels[self._updateChannel()]=true;
// Now for each published link, do similar operations
cn.smembers(self._publishKey(objectId), function(err, results) {
if (err) return callback(err);
var fqObjectId=objectId + '@' + self.sessionId;
// Each result is a Buffer containing a session id
results.forEach(function(result) {
var tgtSessionId=result.toString();
commands.zadd(self._logKey(tgtSessionId), txid, fqObjectId);
commands.zrem(self._expungeKey(tgtSessionId), fqObjectId);
//commands.publish(self._updateChannel(tgtSessionId), self._updateMessage(txid));
publishChannels[self._updateChannel(tgtSessionId)]=true;
});
// Loop
return updateRest(index+1, txid);
});
}
function commitCommands(txid) {
// Each channel that was marked to publish to needs a publish
var publishMessage=self._updateMessage(txid);
Object.keys(publishChannels).forEach(function(channelName) {
commands.publish(channelName, publishMessage);
});
commands.exec(function(err, replies) {
return callback(err);
});
}
},
/**
* A Future class as per the literature on the topic.
* The two main operations are force() and resolve().
*/
function Future(resolution) {
this._resolved=false;
this._errored=false;
this._resolution=null;
this._pending=null;
if (arguments.length>0) {
// Create a resolved future
this.resolve(resolution);
}
}
/**
* Cast an arbitrary value to a Future. If already a Future,
* just return it. Otherwise, return a new Future resolved
* with the value.
*/
Future.cast=function(futureOrLiteral) {
if (futureOrLiteral instanceof Future) return futureOrLiteral;
else return new Future(futureOrLiteral);
};
Future.prototype={};
/**
* Invokes the callback(result) upon resolution of the future.
* Return true if force executed immediately, false if pended.
*
* If the callback is pended, it is wrapped with Block.guard
* so that any exceptions it throws are routed to the Block
* in effect at the time of the call to force().
*/
Future.prototype.force=function(callback) {
if (this._resolved) {
if (callback) callback(this._resolution);
return true;
} else if (callback) {
// Pend it.
var pended=Block.guard(callback);
if (!this._pending) this._pending=[pended];
else this._pending.push(pended);
return false;
} else {
return false;
}
};
/**
* Resolves the future. Any pended force callbacks are
* executed immediately. Any future calls to force() will
* invoke their callbacks immediately.
*/
Future.prototype.resolve=function(resolution) {
if (this._resolved) {
throw new Error('Logic error. Future resolved multiple times.');
}
this._resolved=true;
this._resolution=resolution;
if (this._pending) {
this._pending.forEach(function(pended) {
pended(resolution);
});
}
};
/**
* Return a new Future whose resolution is dependent on
* the resolution of this future. When this future is
* resolved, the transformer callback will be invoked with
* its resolution and the callback result will become the
* resolution of the new Future returned by this method.
*/
Future.prototype.chain=function(transformer) {
var chained=new Future();
this.force(function(resolution) {
chained.resolve(transformer(resolution));
});
return chained;
};
/**
* Forward the resolution from this future to another future.
* This future is forced and the resolution is resolved on
* the passed future.
*/
Future.prototype.forward=function(otherFuture) {
this.force(function(resolution) {
otherFuture.resolve(resolution);
});
};
/**
* Sends a message to a queue. If recipient is an array, then multiple messages are put
* on the queue. Message can either be a simple string or a compound object of a type
* that the given queue understands. Invoke callback(err) on completion.
* @param {Object} queue
* @param {Object} recipient
* @param {Object} message
*/
sendMessage: function(queue, recipient, message, callback) {
var self=this;
var queueType=QUEUE_TYPES[queue];
if (!queueType) {
return respond('Illegal queue type ' + queueType);
}
var queueKey=QUEUE_KEY_PREFIX + queueType;
var notifyChannel=NOTIFY_CHANNEL_PREFIX + queueType;
var commands=self.shared().multi();
var wasEmpty=false;
var depth=0;
if (!Array.isArray(recipient)) recipient=[recipient];
recipient.forEach(function(r) {
var queueObject={
queue: queueType,
recipient: r,
timestamp: Date.now(),
body: message
};
var queueJson=JSON.stringify(queueObject);
commands.lpush(queueKey, queueJson, function(err, count) {
if (err) return respond(err);
count=Number(count);
depth=count;
if (count===1) {
// We transitioned to non-empty
wasEmpty=true;
}
});
});
// -- send it on
commands.exec(function(err) {
if (err) return respond(err);
if (wasEmpty) {
self.shared().publish(notifyChannel, 'notempty', function(err) {
return respond(err);
});
} else {
return respond(err);
}
});
function respond(err) {
if ('function' === typeof callback) {
callback(err, depth);
callback = null;
}
}
},
function doSomething(response, callback) {
doSomeAsyncCall('abc', 123, function(err, result) {
if (err) {
callback(err);
}
callback(null, result);
});
}
function doSomething(response, callback) {
doSomeAsyncCall('abc', 123, function(err, result) {
if (err) {
return callback(err);
}
return callback(null, result);
});
}
var nodeunit=require('nodeunit');
var myModuleUnderTest=require('../lib/myModuleUnderTest');
module.exports=nodeunit.testCase({
'test for smoke': function(test) {
test.done();
},
});
@stellaraccident
Copy link
Author

stellaraccident commented Feb 5, 2012 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment