Created
July 30, 2014 01:28
-
-
Save rphillips/510069bac2cf7eff9d19 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var async = require('async'); | |
var _ = require('underscore'); | |
var ld = require('ld'); | |
var dbopsEntity = require('../db/ops/entity'); | |
var dbopsAgentConnection = require('../db/ops/agent_connection'); | |
var dbopsAgent = require('../db/ops/agent'); | |
var errors = require('../util/errors'); | |
var flowCtrl = require('rackspace-shared-utils/lib/flow_control'); | |
var log = require('logmagic').local('ele.lib.util.agent'); | |
var instruments = require('rackspace-shared-utils/lib/instruments'); | |
var CHECK_TYPES = require('../db/models/check_type').CHECK_TYPES; | |
var settings = require('./settings'); | |
var aeUtils = require('../agent_endpoint/utils'); | |
var getEndpointClient = require('../agent_endpoint/endpoint_client').getEndpointClient; | |
var getHostInfoNameFromAgentType = require('../agent_endpoint/endpoint_client').getHostInfoNameFromAgentType; | |
var MAX_RETRIES = 3; | |
var DEFAULT_AGENT_TIMEOUT = 60 * 1000; // milliseconds | |
// Thrift Error Messages | |
var THRIFT_NO_CONNECTIONS_ERROR = 'No connections'; | |
/** | |
* Calculates relative 'distance' to a datacenter. | |
* Note that this uses the "distance" of the strings, until | |
* such a time we have a better way to make sure dfw == dfw1 and such. | |
* | |
* An ideal implementation would have latency information for | |
* every target datacenter here, rathern than using a string | |
* distance. | |
* | |
* @param {String} current Current Datacenter to measure from. | |
* @param {String} dest Destination Datacenter to measure to. | |
* @return {Number} Ascendering order rank (lower is better). | |
*/ | |
function datacenterDistance(current, dest) { | |
return ld.computeDistance(current, dest); | |
} | |
/** | |
* AgentHelper | |
* | |
* This module queries for agent connections and attempts to run a method on | |
* each connection until successful. | |
* | |
* One of the following properties needs to be set: | |
* - AgentID | |
* - EntityID | |
* - Entity | |
* | |
* The class will automatically use one of these properties to find the agent | |
* connections or return an error to the 'run' callback. | |
* | |
* Note: The parameters from the iterator callback get passed through to the | |
* completion callback. | |
* | |
* @param {Object} options Various options. | |
* | |
* @constructor | |
*/ | |
function AgentHelper(options) { | |
options = options || {}; | |
this.agentId = null; | |
this.entityId = null; | |
this.entity = null; | |
this.timeout = options.timeout || DEFAULT_AGENT_TIMEOUT; | |
} | |
/** Set the AgentId | |
* @param {String} id The Agent's ID. | |
*/ | |
AgentHelper.prototype.setAgentId = function(id) { | |
this.agentId = id; | |
}; | |
/** Set the Entity ID. | |
* @param {String} id The Entity's ID. | |
*/ | |
AgentHelper.prototype.setEntityId = function(id) { | |
this.entityId = id; | |
}; | |
/** Set the Entity. | |
* @param {Object} entity The Entity. | |
*/ | |
AgentHelper.prototype.setEntity = function(entity) { | |
this.entity = entity; | |
}; | |
/** Attempt the endpoints. | |
* @param {Context} ctx The context. | |
* @param {Array} connections The agent connections. | |
* @param {Function} iter The iterator to run. | |
* @param {Function} callback The callback. | |
*/ | |
AgentHelper.prototype._tryEndpoints = function(ctx, connections, iter, callback) { | |
var self = this, | |
mydc = settings.DATACENTER.toLowerCase(); | |
function sortConnections(conns) { | |
if (settings.DATACENTER) { | |
conns = _.sortBy(conns, function(conn) { | |
return datacenterDistance(mydc, conn.datacenter.toLowerCase()); | |
}); | |
} | |
else { | |
/* In the event our system is misconfigured, use a randomized order. */ | |
conns = _.shuffle(conns); | |
} | |
return conns; | |
} | |
async.auto({ | |
conns: function(callback) { | |
function iter(conn, callback) { | |
var client = getEndpointClient(conn.endpoint_thrift_addr); | |
client.getAgentConnStatus(ctx, conn.guid, FILTER_TIMEOUT, function(err) { | |
if (err) { | |
callback(false); | |
} else { | |
callback(true); | |
} | |
}); | |
} | |
async.filter(connections, iter, function(results) { | |
callback(null, sortConnections(results)); | |
});; | |
}, | |
aep: ['conns', function(callback, results) { | |
if (results.conns.length === 0) { | |
log.debug('Could not connect to agent', { ctx: ctx, agentId: self.agentId }); | |
callback(new errors.AgentEndpointError(null, 400, 'Could not connect to agent')); | |
return; | |
} | |
client = getEndpointClient(thriftAddr); | |
}] | |
}, function(err) { | |
if (err) { | |
callback(err); | |
return; | |
} | |
callback.apply(null, responseArgs); | |
}); | |
function run(callback) { | |
var client, agentCtx, thriftAddr; | |
thriftAddr = connections[i].endpoint_thrift_addr; | |
log.debug('Attempting Agent Endpoint Thrift Connection', { | |
thrift_addr: thriftAddr, | |
ctx: ctx | |
}); | |
client = getEndpointClient(thriftAddr); | |
agentCtx = {}; | |
agentCtx.client = client; | |
agentCtx.connections = connections; | |
agentCtx.agentId = self.agentId; | |
agentCtx.ctx = ctx; | |
agentCtx.guid = connections[i].guid; | |
iter(agentCtx, function(err) { | |
if (err) { | |
log.debug('Received error from agent endpont', { | |
ctx: ctx, | |
agentId: self.agentId, | |
thriftAddr: thriftAddr, | |
err: err | |
}); | |
i++; | |
} else { | |
log.debug('Successful response from agent endpoint', {ctx: ctx, agentId: self.agentId, thriftAddr: thriftAddr}); | |
} | |
responseArgs = Array.prototype.slice.call(arguments); | |
callback(err); | |
}); | |
} | |
}; | |
/** Run an iter given a context. | |
* @param {Context} ctx The context. | |
* @param {Function} iter The iterator (agentCtx, callback). | |
* @param {Function} callback The completion callback (err, {iterator callback params}. | |
*/ | |
AgentHelper.prototype.run = function(ctx, iter, callback) { | |
var self = this; | |
if (!_.isString(self.agentId) && | |
!_.isString(self.entityId) && | |
!self.entity) { | |
process.nextTick(function() { | |
callback(new errors.AgentDoesNotExistError(self.agentId)); | |
return; | |
}); | |
return; | |
} | |
async.auto({ | |
getEntity: function getEntity(callback) { | |
if (self.agentId) { | |
callback(); | |
return; | |
} | |
if (self.entity) { | |
self.agentId = self.entity.agent_id; | |
callback(); | |
} else if (self.entityId) { | |
dbopsEntity.get(ctx, self.entityId, function(err, entity) { | |
if (err) { | |
callback(err); | |
return; | |
} | |
self.agentId = entity.agent_id; | |
callback(); | |
}); | |
} | |
}, | |
validateAgentId: ['getEntity', function validateAgentId(callback) { | |
if (!self.agentId) { | |
callback(new errors.AgentNotBound()); | |
} else { | |
callback(); | |
} | |
}], | |
getAgent: ['validateAgentId', function getAgent(callback) { | |
dbopsAgent.get(ctx, self.agentId, function(err, agent) { | |
if (err && err instanceof errors.ObjectDoesNotExistError) { | |
callback(new errors.AgentDoesNotExistError(self.agentId)); | |
return; | |
} | |
callback(err); | |
}); | |
}], | |
getConnections: ['validateAgentId', function getAllConnections(callback) { | |
dbopsAgentConnection.getAll(ctx, self.agentId, callback); | |
}], | |
validateConnections: ['getAgent', 'getConnections', function validateConnections(callback, results) { | |
var connections = results.getConnections[0]; | |
// This agent is not connected if length == 0 | |
if (connections.length === 0) { | |
callback(new errors.AgentNotConnected(self.agentId)); | |
return; | |
} | |
callback(null, connections); | |
}] | |
}, | |
function(err, results) { | |
if (err) { | |
if (!(err instanceof errors.AgentNotConnected)) { | |
log.debug('Error validating agent before running operation.', {ctx: ctx, actual_err: err}); | |
instruments.recordEvent('lib.util.agent.agent_not_connected'); | |
} | |
callback(err); | |
return; | |
} | |
self._tryEndpoints(ctx, results.validateConnections, iter, callback); | |
}); | |
}; | |
/** Export AgentHelper */ | |
exports.AgentHelper = AgentHelper; | |
/** Gathers host information types by Agent ID | |
* @param {Context} ctx agent The Context. | |
* @param {String} agentId Agent ID. | |
* @param {Function} callback expects(err, result). | |
*/ | |
exports.hostInfoTypesByAgent = function(ctx, agentId, callback) { | |
async.waterfall([ | |
function queryTheEndpoint(callback) { | |
var helper; | |
function iter(agentCtx, callback) { | |
agentCtx.client.getHostInfoTypes(agentCtx.ctx, agentCtx.guid, callback); | |
} | |
helper = new AgentHelper(); | |
helper.setAgentId(agentId); | |
helper.run(ctx, iter, callback); | |
}, | |
function parseResponse(response, info, callback) { | |
var obj = {}; | |
obj.types = []; | |
_.each(response.types, function(typename) { | |
obj.types.push(getHostInfoNameFromAgentType(typename)); | |
}); | |
obj.getSerializerType = function() { | |
return 'agent_list_host_info_types'; | |
}; | |
callback(null, obj); | |
} | |
], callback); | |
}; | |
/** Gathers host information by Agent ID | |
* @param {Context} ctx agent The Context. | |
* @param {Object} hostInfoType HostInfoType object. | |
* @param {String} agentId Agent ID. | |
* @param {Function} callback expects(err, result). | |
*/ | |
exports.hostInfoByAgent = function(ctx, hostInfoType, agentId, callback) { | |
async.waterfall([ | |
function queryTheEndpoint(callback) { | |
var helper; | |
function iter(agentCtx, callback) { | |
agentCtx.client.getHostInfo(agentCtx.ctx, agentCtx.guid, hostInfoType.type, callback); | |
} | |
helper = new AgentHelper(); | |
helper.setAgentId(agentId); | |
helper.run(ctx, iter, callback); | |
}, | |
function parseResponse(response, info, callback) { | |
var metrics, serializerType = hostInfoType.serializerType, | |
obj = {}; | |
if (response.metrics && (response.metrics instanceof Array)) { | |
if (hostInfoType.isArray) { | |
metrics = aeUtils.convertMetricsToSwizObject(response.metrics, serializerType); | |
} | |
else { | |
metrics = aeUtils.convertMetricToSwizObject(response.metrics[0], serializerType); | |
} | |
obj.error = response.error; | |
obj.info = metrics; | |
obj.timestamp = response.timestamp.valueOf(); | |
obj.getSerializerType = function() { | |
return 'agent_host_info'; | |
}; | |
callback(null, obj); | |
} | |
else if (response.error) { | |
if (response.error === THRIFT_NO_CONNECTIONS_ERROR) { | |
callback(new errors.AgentNotConnected()); | |
} else { | |
callback(new errors.AgentError(response.error)); | |
} | |
} | |
else { | |
callback(new errors.AgentError('Invalid Response')); | |
} | |
} | |
], callback); | |
}; | |
/** Tell Agent it's schedule changed. | |
* @param {Context} ctx agent The Context. | |
* @param {String} agentId Agent ID. | |
* @param {Function} callback expects(err, result). | |
*/ | |
exports.sendCheckScheduleChanged = function(ctx, agentId, callback) { | |
async.waterfall([ | |
function queryTheEndpoint(callback) { | |
var helper; | |
function iter(agentCtx, callback) { | |
agentCtx.client.sendCheckScheduleChanged(agentCtx.ctx, agentCtx.guid, callback); | |
} | |
helper = new AgentHelper(); | |
helper.setAgentId(agentId); | |
helper.run(ctx, iter, callback); | |
}, | |
function parseResponse(response, info, callback) { | |
if (response.error) { | |
if (response.error === THRIFT_NO_CONNECTIONS_ERROR) { | |
callback(new errors.AgentNotConnected()); | |
return; | |
} | |
else { | |
callback(new errors.AgentError(response.error)); | |
return; | |
} | |
} | |
else { | |
callback(null, response); | |
} | |
} | |
], callback); | |
}; | |
/** Gathers Target informatin by Agent ID. | |
* @param {Context} ctx agent The Context. | |
* @param {String} checkType the check type. | |
* @param {String} agentId Agent ID. | |
* @param {Function} callback expects(err, result). | |
*/ | |
exports.checkTargets = function(ctx, checkType, agentId, callback) { | |
async.waterfall([ | |
function queryTheEndpoint(callback) { | |
var helper; | |
function iter(agentCtx, callback) { | |
agentCtx.client.getTargets(agentCtx.ctx, agentCtx.guid, checkType, callback); | |
} | |
helper = new AgentHelper(); | |
helper.setAgentId(agentId); | |
helper.run(ctx, iter, callback); | |
}, | |
function parseResponse(response, info, callback) { | |
if (!response.targets) { | |
callback(new errors.CheckDoesNotSupportTargets(checkType)); | |
return; | |
} | |
callback(null, response.targets.map(function(id) { | |
return { | |
id: id, | |
label: id, // TODO: improve these labels | |
getSerializerType: function() { | |
return 'agent_check_targets'; | |
} | |
}; | |
})); | |
} | |
], callback); | |
}; | |
/** Is this an agent check type. | |
* @param {String} type The check type. | |
* @return {Boolean} true/false. | |
*/ | |
exports.isAgentCheckType = function(type) { | |
return type.indexOf('agent.') === 0 && CHECK_TYPES[type]; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment