Skip to content

Instantly share code, notes, and snippets.

@formula1
Created March 16, 2015 17:15
Show Gist options
  • Save formula1/c05c4931e6e11b6fcb2f to your computer and use it in GitHub Desktop.
Save formula1/c05c4931e6e11b6fcb2f to your computer and use it in GitHub Desktop.
Waterline Queue
var ee = require("events").EventEmitter;
function Waterline(){
ee.call(this);
this.adapters = {};
this.connections = {};
this.collections = {};
}
Waterline.prototype = Object.create(ee.prototype);
Waterline.prototype.constructor = Waterline;
Waterline.prototype.loadAdapter = function(adapter){
this.adapters[adapter.name] = Adapter.DefaultValidateAndEtc(adapter);
this.emit("adapter["+adapter.name+"]",adapter);
}
Waterline.prototype.loadConnection = function(connnection,next){
if(connection.adapter instanceof Adapter){
this.loadAdapter(connection.adapter);
connection.adapter = connection.adapter.name;
}
connection = Connection.DefaultValidateAndEtc(connection);
connection.waterline = this;
connection.getState = function(){
if(!this.waterline.adapters[this.adapter]) return Waterline.STATES.QUEUED;
return this.waterline.adapters[this.adapter].getState();
};
this.connections[connection.name] = connection;
this.emit("connection["+connection.name+"]",connection)
switch(connection.getState()){
case Waterline.STATES.REJECTED: setImmediate(next.bind(next,"Adapter has been rejected"));
case Waterline.STATES.OFFLINE: setImmediate(next.bind(next,"Adapter is offline",connection));
case Waterline.STATES.ONLINE: setImmediate(next.bind(next,void(0),connection));
}
Waterline.once("adapter["+adapter.name+"]",function(adapter){
switch(connection.getState()){
case Waterline.STATES.REJECTED: return next("Adapter has been rejected");
case Waterline.STATES.OFFLINE: return next("Adapter is offline",connection);
case Waterline.STATES.ONLINE: return next(void(0),connection);
}
});
}
Waterline.loadCollection = function(collection,next){
if(!collection.connection) collection.connection = ["default"];
if(!Array.isArray(collection.connection)) collection.connection = [collection.connection];
var waterine = this;
collection.connection.map(function(connection){
if(connection instanceof Connection){
waterline.loadConnection(connection);
connection = connection.name;
}
if(!waterline.connections[connection]){
waterline.once("connection["+connection+"]",function(connection){
switch(collection.getState()){
case Waterline.STATES.QUEUED: return;
case Waterline.STATES.REJECTED: return next("Collection has been rejected");
case Waterline.STATES.OFFLINE: return next("Collection is offline",connection);
case Waterline.STATES.ONLINE: return next(void(0),collection);
}
});
}
return connection;
});
collection.dependencyCollections = Collection.parseDependencyCollections(collection);
collection.dependencyCollections.forEach(function(depCol){
if(!waterline.connections[depCol]){
waterline.once("collection["+depCol+"]",function(depCol){
switch(collection.getState()){
case Waterline.STATES.QUEUED: return;
case Waterline.STATES.REJECTED: return next("Collection has been rejected");
case Waterline.STATES.OFFLINE: return next("Collection is offline",collection);
case Waterline.STATES.ONLINE: return next(void(0),collection);
}
});
}
});
collection.waterline = this;
this.collection[collection.name] = collection;
collection.getState = function(previousCollections){
previousCollections = previousCollections||[];
var allRejected=true,i,l,state;
for(i=0,l=this.dependencyCollections.length;i<l;i++){
var depCol = this.dependencyCollections[i];
if(!this.waterline.collections[depCol]) return Waterline.STATES.QUEUED;
if(previousCollections.indexOf(depCol) != -1) continue;
state = this.waterline.collections[depCol].getState(previousCollections.concat([this.name]));
if(state !== Waterline.STATES.ONLINE){
return state;
}
}
for(i=0,l=this.connections.length;i<l;i++){
var conn = this.connections[i];
if(!this.waterline.connections[conn]) continue;
if(this.waterline.connections[conn].state == Waterline.STATES.ONLINE){
return Waterline.STATES.ONLINE;
}
if(this.waterline.connections[conn].state == Waterline.STATES.OFFLINE){
return Waterline.STATES.OFFLINE;
}
}
return Waterline.STATES.QUEUED;
}
});
this.emit("collection["+collection.name+"]",collection);
switch(collection.getState()){
case Waterline.STATES.REJECTED: return setImmediate(next.bind(next,"Collection or a dependency has been rejected"));
case Waterline.STATES.OFFLINE: return setImmediate(next.bind(next,"Collection or a dependency is offline",connection));
case Waterline.STATES.ONLINE: return setImmediate(next.bind(next,void(0),collection));
}
}
Waterline.STATES = [
QUEUED: "queued",
ONLINE:"online",
OFFLINE:"offline",
REJECTED:"fail"
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment