Skip to content

Instantly share code, notes, and snippets.

@stellaraccident
Created March 21, 2011 20:05
Show Gist options
  • Select an option

  • Save stellaraccident/880102 to your computer and use it in GitHub Desktop.

Select an option

Save stellaraccident/880102 to your computer and use it in GitHub Desktop.
Bulletproof node coding snippets
request: function(options) {
var future=new Future();
var req=http.request(options, function(res) {
var text='';
res.setEncoding('utf8');
res.on('data', function(chunk) {
text+=chunk;
});
res.on('end', Block.guard(function() {
future.resolve(new CouchResponse(res, text));
}));
res.on('error', Block.errorHandler());
});
req.on('error', Block.errorHandler());
req.end();
return future;
}
function handleUserAgent(req, res, next) {
return Block.begin(process, next);
function process() {
return jsonifyRequest(req).force(withRequest);
}
function withRequest(requestObj) {
var r=validators.UserAgentRecord(requestObj, {fix:true});
if (!r.valid) {
res.writeHead(400);
return res.end('Invalid request object: ' + r.reason);
}
var uar=r.object;
if (uar.token) {
// Verify
//return handler.verifyUserAgent(uar);
throw new Error('verifyUserAgent not yet implemented');
} else {
// Create
uar.token=null;
uar.type='auth'; // TODO: Maybe support unauth in the future?
return handler.createUserAgent(uar).force(withUserAgent);
}
}
function withUserAgent(userAgent) {
var r=validators.UserAgentRecord(userAgent, {fix:true});
return respondJson(r.object, res);
}
}
/**
* Block class is used for routing errors to higher level logic.
*/
function Block(errback) {
this._parent=Block.current;
this._errback=errback;
}
Block.current=null;
/**
* Wrap a function such that any exceptions it generates
* are sent to the error callback of the Block that is active
* at the time of the call to guard(). If no Block
* is active, just returns the function.
*
* Example: stream.on('end', Block.guard(function() { ... }));
*/
Block.guard=function(f) {
if (this.current) return this.current.guard(f);
else return f;
};
/**
* Begins a new Block with two callback functions. The first
* is the main part of the block (think 'try body'), the
* second is the rescue function/error callback (think 'catch').
* The terminology follows Ruby for no other reason than that
* Block, begin and rescue describe an exception handling
* paradigm and are not reserved words in JavaScript.
*/
Block.begin=function(block, rescue) {
var ec=new Block(rescue);
return ec.trap(block);
};
/**
* Returns a function(err) that can be invoked at any time to raise
* an exception against the now current block (or the current context
* if no current). Errors are only raised if the err argument is true
* so this can be used in both error callbacks and error events.
*
* Example: request.on('error', Block.errorHandler())
*/
Block.errorHandler=function() {
// Capture the now current Block for later
var current=this.current;
return function(err) {
if (!err) return;
if (current) return current.raise(err);
else throw err;
};
};
/**
* Raises an exception on the Block. If the block has an
* error callback, it is given the exception. Otherwise,
* raise(...) is called on the parent block. If there is
* no parent, the exception is simply raised.
* Any nested exceptions from error callbacks will be raised
* on the block's parent.
*/
Block.prototype.raise=function(err) {
if (this._errback) {
try {
this._errback(err);
} catch (nestedE) {
if (this._parent) this._parent.raise(nestedE);
else throw nestedE;
}
} else {
if (this._parent) this._parent.raise(err);
else throw(err);
}
};
/**
* Executes a callback in the context of this block. Any
* errors will be passed to this Block's raise() method.
* Returns the value of the callback or undefined on error.
*/
Block.prototype.trap=function(callback) {
var origCurrent=Block.current;
Block.current=this;
try {
var ret=callback();
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
this.raise(e);
}
};
/**
* Wraps a function and returns a function that routes
* errors to this block. This is similar to trap but
* returns a new function instead of invoking the callback
* immediately.
*/
Block.prototype.guard=function(f) {
if (f.__guarded__) return f;
var self=this;
var wrapped=function() {
var origCurrent=Block.current;
Block.current=self;
try {
var ret=f.apply(this, arguments);
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
self.raise(e);
}
};
wrapped.__guarded__=true;
return wrapped;
};
/**
* Library of control flow utilities. There are lots of promise libraries on node
* and to disambiguate what I am using for my personal style from those, I chose the
* (near) synonym "future". I actually enjoy most of the callback style of node
* and don't want to hide it too much, so this library attempts to do very little
* magic, relying instead on explicit intent.
*/
function sliceArray(ary, begin, end) {
return Array.prototype.slice.call(ary, begin, end);
}
/**
* Block class is used for routing errors to higher level logic.
*/
function Block(errback) {
this._parent=Block.current;
this._errback=errback;
}
Block.current=null;
/**
* Wrap a function such that any exceptions it generates
* are sent to the error callback of the Block that is active
* at the time of the call to guard(). If no Block
* is active, just returns the function.
*
* Example: stream.on('end', Block.guard(function() { ... }));
*/
Block.guard=function(f) {
if (this.current) return this.current.guard(f);
else return f;
};
/**
* Begins a new Block with two callback functions. The first
* is the main part of the block (think 'try body'), the
* second is the rescue function/error callback (think 'catch').
* The terminology follows Ruby for no other reason than that
* Block, begin and rescue describe an exception handling
* paradigm and are not reserved words in JavaScript.
*/
Block.begin=function(block, rescue) {
var ec=new Block(rescue);
return ec.trap(block);
};
/**
* Returns a function(err) that can be invoked at any time to raise
* an exception against the now current block (or the current context
* if no current). Errors are only raised if the err argument is true
* so this can be used in both error callbacks and error events.
*
* Example: request.on('error', Block.errorHandler())
*/
Block.errorHandler=function() {
// Capture the now current Block for later
var current=this.current;
return function(err) {
if (!err) return;
if (current) return current.raise(err);
else throw err;
};
};
/**
* Raises an exception on the Block. If the block has an
* error callback, it is given the exception. Otherwise,
* raise(...) is called on the parent block. If there is
* no parent, the exception is simply raised.
* Any nested exceptions from error callbacks will be raised
* on the block's parent.
*/
Block.prototype.raise=function(err) {
if (this._errback) {
try {
this._errback(err);
} catch (nestedE) {
if (this._parent) this._parent.raise(nestedE);
else throw nestedE;
}
} else {
if (this._parent) this._parent.raise(err);
else throw(err);
}
};
/**
* Executes a callback in the context of this block. Any
* errors will be passed to this Block's raise() method.
* Returns the value of the callback or undefined on error.
*/
Block.prototype.trap=function(callback) {
var origCurrent=Block.current;
Block.current=this;
try {
var ret=callback();
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
this.raise(e);
}
};
/**
* Wraps a function and returns a function that routes
* errors to this block. This is similar to trap but
* returns a new function instead of invoking the callback
* immediately.
*/
Block.prototype.guard=function(f) {
if (f.__guarded__) return f;
var self=this;
var wrapped=function() {
var origCurrent=Block.current;
Block.current=self;
try {
var ret=f.apply(this, arguments);
Block.current=origCurrent;
return ret;
} catch (e) {
Block.current=origCurrent;
self.raise(e);
}
};
wrapped.__guarded__=true;
return wrapped;
};
/**
* A Future class as per the literature on the topic.
* The two main operations are force() and resolve().
*/
function Future(resolution) {
this._resolved=false;
this._errored=false;
this._resolution=null;
this._pending=null;
if (arguments.length>0) {
// Create a resolved future
this.resolve(resolution);
}
}
/**
* Cast an arbitrary value to a Future. If already a Future,
* just return it. Otherwise, return a new Future resolved
* with the value.
*/
Future.cast=function(futureOrLiteral) {
if (futureOrLiteral instanceof Future) return futureOrLiteral;
else return new Future(futureOrLiteral);
};
Future.prototype={};
/**
* Invokes the callback(result) upon resolution of the future.
* Return true if force executed immediately, false if pended.
*
* If the callback is pended, it is wrapped with Block.guard
* so that any exceptions it throws are routed to the Block
* in effect at the time of the call to force().
*/
Future.prototype.force=function(callback) {
if (this._resolved) {
if (callback) callback(this._resolution);
return true;
} else if (callback) {
// Pend it.
var pended=Block.guard(callback);
if (!this._pending) this._pending=[pended];
else this._pending.push(pended);
return false;
} else {
return false;
}
};
/**
* Resolves the future. Any pended force callbacks are
* executed immediately. Any future calls to force() will
* invoke their callbacks immediately.
*/
Future.prototype.resolve=function(resolution) {
if (this._resolved) {
throw new Error('Logic error. Future resolved multiple times.');
}
this._resolved=true;
this._resolution=resolution;
if (this._pending) {
this._pending.forEach(function(pended) {
pended(resolution);
});
}
};
/**
* Return a new Future whose resolution is dependent on
* the resolution of this future. When this future is
* resolved, the transformer callback will be invoked with
* its resolution and the callback result will become the
* resolution of the new Future returned by this method.
*/
Future.prototype.chain=function(transformer) {
var chained=new Future();
this.force(function(resolution) {
chained.resolve(transformer(resolution));
});
return chained;
};
/**
* Forward the resolution from this future to another future.
* This future is forced and the resolution is resolved on
* the passed future.
*/
Future.prototype.forward=function(otherFuture) {
this.force(function(resolution) {
otherFuture.resolve(resolution);
});
};
// -- exports
exports.Block=Block;
exports.Future=Future;
/**
* Add or update an object in the session and invoke
* callback(err) when complete. updates is an array
* of {objectId:..., value:...}
*/
updateObjects: function(updates, callback) {
if (!Array.isArray(updates)) return callback(new Error('updates must be an array'));
var self=this, cn=self.cn;
// Initialize the commands
var commands=cn.multi(),
publishChannels={};
// First grab a new txid
return self._nextTxId(withTransactionId);
// -- callbacks
function withTransactionId(err, txid) {
if (err) return callback(err);
updateRest(0, txid);
}
function updateRest(index, txid) {
if (index>=updates.length) {
// Termination condition. Commit the commands.
return commitCommands(txid);
}
// Validate the record
var updateRecord=updates[index],
objectId=updateRecord.objectId,
object=updateRecord.value;
if (!objectIdIsValid(objectId)) return callback(new Error('Illegal ObjectId: "' + objectId + '"'));
if (object===null || 'object'!==typeof(object)) {
return callback(new Error('Illegal object value "' + object + '"'));
}
// Get it as json
var objectJson;
try {
objectJson=JSON.stringify(object);
} catch (e) {
return callback(e);
}
// Queue commands for updating this session
commands.hset(self._contentsKey(), objectId, objectJson); // Add to contents hash
commands.zadd(self._logKey(), txid, objectId); // Add to the txlog
commands.zrem(self._expungeKey(), objectId); // Undelete previous
//commands.publish(self._updateChannel(), self._updateMessage(txid)); // Publish update
publishChannels[self._updateChannel()]=true;
// Now for each published link, do similar operations
cn.smembers(self._publishKey(objectId), function(err, results) {
if (err) return callback(err);
var fqObjectId=objectId + '@' + self.sessionId;
// Each result is a Buffer containing a session id
results.forEach(function(result) {
var tgtSessionId=result.toString();
commands.zadd(self._logKey(tgtSessionId), txid, fqObjectId);
commands.zrem(self._expungeKey(tgtSessionId), fqObjectId);
//commands.publish(self._updateChannel(tgtSessionId), self._updateMessage(txid));
publishChannels[self._updateChannel(tgtSessionId)]=true;
});
// Loop
return updateRest(index+1, txid);
});
}
function commitCommands(txid) {
// Each channel that was marked to publish to needs a publish
var publishMessage=self._updateMessage(txid);
Object.keys(publishChannels).forEach(function(channelName) {
commands.publish(channelName, publishMessage);
});
commands.exec(function(err, replies) {
return callback(err);
});
}
},
/**
* A Future class as per the literature on the topic.
* The two main operations are force() and resolve().
*/
function Future(resolution) {
this._resolved=false;
this._errored=false;
this._resolution=null;
this._pending=null;
if (arguments.length>0) {
// Create a resolved future
this.resolve(resolution);
}
}
/**
* Cast an arbitrary value to a Future. If already a Future,
* just return it. Otherwise, return a new Future resolved
* with the value.
*/
Future.cast=function(futureOrLiteral) {
if (futureOrLiteral instanceof Future) return futureOrLiteral;
else return new Future(futureOrLiteral);
};
Future.prototype={};
/**
* Invokes the callback(result) upon resolution of the future.
* Return true if force executed immediately, false if pended.
*
* If the callback is pended, it is wrapped with Block.guard
* so that any exceptions it throws are routed to the Block
* in effect at the time of the call to force().
*/
Future.prototype.force=function(callback) {
if (this._resolved) {
if (callback) callback(this._resolution);
return true;
} else if (callback) {
// Pend it.
var pended=Block.guard(callback);
if (!this._pending) this._pending=[pended];
else this._pending.push(pended);
return false;
} else {
return false;
}
};
/**
* Resolves the future. Any pended force callbacks are
* executed immediately. Any future calls to force() will
* invoke their callbacks immediately.
*/
Future.prototype.resolve=function(resolution) {
if (this._resolved) {
throw new Error('Logic error. Future resolved multiple times.');
}
this._resolved=true;
this._resolution=resolution;
if (this._pending) {
this._pending.forEach(function(pended) {
pended(resolution);
});
}
};
/**
* Return a new Future whose resolution is dependent on
* the resolution of this future. When this future is
* resolved, the transformer callback will be invoked with
* its resolution and the callback result will become the
* resolution of the new Future returned by this method.
*/
Future.prototype.chain=function(transformer) {
var chained=new Future();
this.force(function(resolution) {
chained.resolve(transformer(resolution));
});
return chained;
};
/**
* Forward the resolution from this future to another future.
* This future is forced and the resolution is resolved on
* the passed future.
*/
Future.prototype.forward=function(otherFuture) {
this.force(function(resolution) {
otherFuture.resolve(resolution);
});
};
/**
* Sends a message to a queue. If recipient is an array, then multiple messages are put
* on the queue. Message can either be a simple string or a compound object of a type
* that the given queue understands. Invoke callback(err) on completion.
* @param {Object} queue
* @param {Object} recipient
* @param {Object} message
*/
sendMessage: function(queue, recipient, message, callback) {
var self=this;
var queueType=QUEUE_TYPES[queue];
if (!queueType) {
return respond('Illegal queue type ' + queueType);
}
var queueKey=QUEUE_KEY_PREFIX + queueType;
var notifyChannel=NOTIFY_CHANNEL_PREFIX + queueType;
var commands=self.shared().multi();
var wasEmpty=false;
var depth=0;
if (!Array.isArray(recipient)) recipient=[recipient];
recipient.forEach(function(r) {
var queueObject={
queue: queueType,
recipient: r,
timestamp: Date.now(),
body: message
};
var queueJson=JSON.stringify(queueObject);
commands.lpush(queueKey, queueJson, function(err, count) {
if (err) return respond(err);
count=Number(count);
depth=count;
if (count===1) {
// We transitioned to non-empty
wasEmpty=true;
}
});
});
// -- send it on
commands.exec(function(err) {
if (err) return respond(err);
if (wasEmpty) {
self.shared().publish(notifyChannel, 'notempty', function(err) {
return respond(err);
});
} else {
return respond(err);
}
});
function respond(err) {
if ('function' === typeof callback) {
callback(err, depth);
callback = null;
}
}
},
function doSomething(response, callback) {
doSomeAsyncCall('abc', 123, function(err, result) {
if (err) {
callback(err);
}
callback(null, result);
});
}
function doSomething(response, callback) {
doSomeAsyncCall('abc', 123, function(err, result) {
if (err) {
return callback(err);
}
return callback(null, result);
});
}
var nodeunit=require('nodeunit');
var myModuleUnderTest=require('../lib/myModuleUnderTest');
module.exports=nodeunit.testCase({
'test for smoke': function(test) {
test.done();
},
});
@cadorn
Copy link
Copy Markdown

cadorn commented Feb 5, 2012

What is the license of the code in this gist?

@stellaraccident
Copy link
Copy Markdown
Author

stellaraccident commented Feb 5, 2012 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment