Created
July 21, 2014 21:27
-
-
Save AnalogJ/bbec266c6d85dc2d215f to your computer and use it in GitHub Desktop.
Ducktyping Sailsjs for Background Tasks via Kue. See http://blog.thesparktree.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _ = require('lodash'), | |
Waterline = require('waterline'), | |
path = require('path'), | |
url = require('url'), | |
kue = require('kue'), | |
redis = require('../../node_modules/sails/node_modules/socket.io/node_modules/redis'), | |
q = require('q') | |
//////////////////////////////////////////////////////////////////// | |
//// SAILS ENV | |
//////////////////////////////////////////////////////////////////// | |
//resolve the sails config files required. | |
var config_path = path.resolve(__dirname,'../..', 'config/') | |
var nconf_required = require(config_path+'/nconf.js'); | |
global.sails = { | |
config: nconf_required | |
}; | |
var filefog_required = require(config_path+'/filefog.js'); | |
sails.config.filefog = filefog_required.filefog; | |
var constants_required = require(config_path+'/constants.js'); | |
sails.config.constants = constants_required.constants; | |
sails.log = require(config_path+'/log.js').log.custom | |
// Instantiate a new instance of the ORM | |
var connection = url.parse(sails.config.nconf.get('DATABASE_URL')) | |
//////////////////////////////////////////////////////////////////// | |
//// REDIS CONFIG | |
//////////////////////////////////////////////////////////////////// | |
global.redis_client = redis.createClient(sails.config.nconf.get("REDIS_CONNECTION:PORT"), sails.config.nconf.get("REDIS_CONNECTION:HOST"),{auth_pass: sails.config.nconf.get("REDIS_CONNECTION:AUTH") || null}); | |
function generate_model_message(model_name,model_id,action, verb,data){ | |
var message = { | |
"name":model_name, | |
"args":[{ | |
"verb" : verb, | |
"data" : data.toJSON(), | |
"id" : model_id | |
}] | |
}; | |
var wrapper = {}; | |
wrapper.nodeId = 648745922; //this could be randomly chosen if we cant determine the client id. | |
wrapper.args = [ | |
"/sails_model_"+model_name+"_"+model_id + ":"+action, | |
"5:::"+JSON.stringify(message), | |
null, | |
[] | |
] | |
return JSON.stringify(wrapper); | |
} | |
function generate_association_message(model_name,model_id,attribute, id_associated, action, verb, verbId){ | |
var item ={ | |
"verb" : verb, | |
"attribute" : attribute, | |
"id" : model_id | |
} | |
item[verbId] = id_associated; | |
var message = { | |
"name":model_name, | |
"args":[item] | |
}; | |
var wrapper = {}; | |
wrapper.nodeId = 648745922; //this could be randomly chosen if we cant determine the client id. | |
wrapper.args = [ | |
"/sails_model_"+model_name+"_"+model_id + ":"+action+":"+attribute, | |
"5:::"+JSON.stringify(message), | |
null, | |
[] | |
] | |
return JSON.stringify(wrapper); | |
} | |
//////////////////////////////////////////////////////////////////// | |
//// WATERLINE CONFIG | |
//////////////////////////////////////////////////////////////////// | |
var orm = new Waterline(); | |
// Require any waterline compatible adapters here | |
var postgresqlAdapter = require('sails-postgresql'); | |
// Build A Config Object | |
var config = { | |
// Setup Adapters | |
// Creates named adapters that have have been required | |
adapters: { | |
'sails-postgresql': postgresqlAdapter | |
}, | |
// Build Connections Config | |
// Setup connections using the named adapter configs | |
connections: { | |
qtPostgresqlServer: { | |
adapter: 'sails-postgresql', | |
host: connection.hostname, | |
port: connection.port || 5432, | |
user: connection.auth.split(':')[0], | |
password: connection.auth.split(':')[1], | |
database: connection.path.substring(1) | |
} | |
}, | |
defaults: { | |
migrate: 'alter' | |
} | |
}; | |
////////////////////////////////////////////////////////////////// | |
// WATERLINE SERVICES | |
////////////////////////////////////////////////////////////////// | |
var api_dir = path.resolve(__dirname,'../..', 'api/') | |
// load services | |
var services = require('include-all')({ | |
dirname : api_dir +'/services', | |
filter : /(.+)\.js$/, | |
excludeDirs : /^\.(git|svn)$/, | |
optional : true | |
}); | |
_.forEach(services, function(service,key){ | |
sails.log.info("Loading service: "+key) | |
global[key] = service; | |
}); | |
////////////////////////////////////////////////////////////////// | |
// WATERLINE MODELS | |
////////////////////////////////////////////////////////////////// | |
// load models | |
var models = require('include-all')({ | |
dirname : api_dir +'/models', | |
filter : /(.+)\.js$/, | |
excludeDirs : /^\.(git|svn)$/, | |
optional : true | |
}); | |
_.forEach(models, function(model,key){ | |
sails.log.info("Register model: "+key) | |
model.identity = key.toLowerCase(); | |
model.connection = 'qtPostgresqlServer'; | |
//add publish methods | |
model.publishCreate = function(id, data){ | |
redis_client.publish("dispatch", generate_model_message(model.identity,id,"update","updated",data)) | |
}; | |
model.publishUpdate = function(id, data){ | |
redis_client.publish("dispatch", generate_model_message(model.identity,id,"create","created",data)) | |
}; | |
model.publishAdd = function(id,attribute, idAdded){ | |
redis_client.publish("dispatch", generate_association_message(model.identity,id,attribute, idAdded, "add", "addedTo", "addedId")) | |
}; | |
model.publishRemove = function(id,attribute, idRemoved){ | |
redis_client.publish("dispatch", generate_association_message(model.identity,id,attribute, idRemoved, "remove", "removedFrom", "removedId")) | |
}; | |
var waterline_model = Waterline.Collection.extend(model); | |
orm.loadCollection(waterline_model); | |
}); | |
////////////////////////////////////////////////////////////////// | |
// Initialization | |
////////////////////////////////////////////////////////////////// | |
function init_redis(){ | |
var deferred = q.defer(); | |
redis_client.on("ready", function () { | |
sails.log.info("Redis ready") | |
return deferred.resolve(redis_client); | |
}); | |
return deferred.promise; | |
} | |
function init_waterline(){ | |
var deferred = q.defer(); | |
// Start Waterline passing adapters in | |
orm.initialize(config, function(err, models) { | |
if (err) { | |
return deferred.reject(err) | |
} | |
else{ | |
sails.log.info("Waterline ready") | |
return deferred.resolve(models); | |
} | |
}); | |
return deferred.promise; | |
} | |
q.spread([init_redis(),init_waterline()],function(redis_client,waterline_models){ | |
sails.models = waterline_models.collections; | |
sails.connections = waterline_models.connections; | |
_.forEach(sails.models, function(model, name){ | |
name = name.charAt(0).toUpperCase() + name.slice(1); | |
global[name] = model; | |
}) | |
sails.log.info("Starting kue") | |
var kue_engine = kue.createQueue({ | |
prefix: 'kue', | |
redis: { | |
port: sails.config.nconf.get('REDIS_CONNECTION:PORT'), | |
host: sails.config.nconf.get('REDIS_CONNECTION:HOST'), | |
auth: sails.config.nconf.get('REDIS_CONNECTION:AUTH') || null | |
} | |
}); | |
//register jobs. | |
var jobs = require('include-all')({ | |
dirname : __dirname +'/jobs', | |
filter : /(.+)\.js$/, | |
excludeDirs : /^\.(git|svn)$/, | |
optional : true | |
}); | |
_.forEach(jobs, function(job, name){ | |
sails.log.info("Registering kue handler: "+name) | |
kue_engine.process(name, job); | |
}) | |
process.once('SIGTERM', function (sig) { | |
kue_engine.shutdown(function (err) { | |
sails.log.error("Shutting down kue") | |
process.exit(0); | |
}, 5000); | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment