Created
September 9, 2013 06:24
-
-
Save codemoran/6492102 to your computer and use it in GitHub Desktop.
Graphdat Metrics collector that listens over UDP, batches up the messages using the Graphdat batch Metrics API. http://developer.graphdat.com/v1/post/measurements
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var _dgram = require('dgram'); | |
var _https = require('https'); | |
// How often does the UDP server flush the message queue and send to Graphdat | |
// To keep the graphs flowing in real time, flush every 3s | |
var FLUSH_TIME = 3000; | |
// What port should the server start up on | |
var SERVER_PORT = 8900; | |
// Configuration options for contacting Graphdat | |
// Email is what you use to sign in with, API Token is from Account -> Settings | |
var CONFIG_EMAIL = '[YOUR_EMAIL]'; | |
var CONFIG_APITOKEN = '[YOUR_API_TOKEN]'; | |
// What output would you like to see displayed, helpful for debugging issues | |
var DEBUG_LOG_INVALID_MESSAGES = true; | |
var DEBUG_LOG_VALID_MESSAGES = false; | |
var DEBUG_LOG_SENT_MESSAGE_QUEUE = false; | |
var DEBUG_LOG_STATUS = true; | |
// Our queue | |
var hash = {}; | |
var messageQueue = []; | |
// Our logger | |
var logger = console; | |
// Keep track of what is happening for reporting | |
var invalidMessages = 0; | |
var sentMessages = 0; | |
var validMessages = 0; | |
function formatDate(dateToFormat) { return dateToFormat.toISOString().replace(/T/, ' ').replace(/\..+/, ''); } | |
function getTime() { return Math.floor(Date.now()/1000); } | |
/* === | |
A Graphdat message has the following format as an object | |
{ | |
"source": "myserver", | |
"metric": "PAGE_FAULTS", | |
"measure": 2, | |
"timestamp": 1377043134 | |
} | |
Or the following format as an array | |
[ | |
"myserver", | |
"PAGE_FAULTS", | |
2, | |
1377043134 | |
] | |
The Timestamp in both examples is optional, but is a good idea | |
as items will graph where you expect them that way. | |
Timestamps are in UNIX time format, so Date.now() / 1000 | |
=== */ | |
function validate(message, cb) | |
{ | |
var obj; | |
if (Array.isArray(message)) | |
{ | |
obj = { | |
source: message[0], | |
metric: message[1], | |
measure: message[2], | |
timestamp: message[3] | |
}; | |
} | |
else if (message !== null && typeof message === 'object') | |
{ | |
obj = message; | |
} | |
else | |
{ | |
return cb('message is not in the correct format'); | |
} | |
if (!('source' in obj)) | |
return cb('message is missing the `source` property'); | |
if (!('metric' in obj)) | |
return cb('message is missing the `metric` property'); | |
if (!('measure' in obj)) | |
return cb('message is missing the `measure` property'); | |
if (!('timestamp' in obj)) | |
obj.timestamp = getTime(); | |
// using JS time and not UNIX time | |
if (obj.timestamp > 2000000000) | |
obj.timestamp = obj.timestamp/1000; | |
// floor the seconds as we have per second metrics | |
obj.timestamp = Math.floor(obj.timestamp); | |
return cb(null, obj); | |
} | |
/* === | |
Graphdat provides 1 second intervals, so if you are sending data at sub second intervals, | |
you are paying for data that you cannot see. If we get two values in the same second, | |
take the GREATER of the two values. | |
We have a bit of leakage on the intervals, but it saves from sending 750 metrics/s to | |
sending only 1 or 2 at the same time period | |
=== */ | |
function addMessageToQueue(message) | |
{ | |
hash = hash || {}; | |
if (!hash[message.timestamp]) | |
hash[message.timestamp] = {}; | |
if (!hash[message.timestamp][message.metric]) | |
hash[message.timestamp][message.metric] = {}; | |
if (!hash[message.timestamp][message.metric][message.source]) | |
{ | |
var obj = [message.source, message.metric, message.measure, message.timestamp]; | |
hash[message.timestamp][message.metric][message.source] = obj; | |
messageQueue.push(obj); | |
} | |
else | |
{ | |
var existing = hash[message.timestamp][message.metric][message.source]; | |
if (existing[2] < message.measure) | |
existing[2] = message.measure; | |
} | |
} | |
/* === | |
To process a message, convert it to JSON, validate it has the required fields | |
and then add it to a queue to be sent to Graphdat | |
=== */ | |
function process(msg, rinfo) | |
{ | |
// convert the message to JSON, if its bad, throw it away | |
var packet; | |
try | |
{ | |
packet = JSON.parse(msg.toString()); | |
} | |
catch(ex) | |
{ | |
if (DEBUG_LOG_INVALID_MESSAGES) | |
logger.error('%s: ERR Message is not JSON:\n%s', formatDate(new Date()), msg); | |
invalidMessages++; | |
return; | |
} | |
// validate the message has the required fields and set the timestamps if they are missing | |
validate(packet, function(err, validMessage) | |
{ | |
if (err) | |
{ | |
if (DEBUG_LOG_INVALID_MESSAGES) | |
logger.error('%s: ERR Invalid Message:\n%j', formatDate(new Date()), packet); | |
invalidMessages++; | |
return; | |
} | |
if (DEBUG_LOG_VALID_MESSAGES) | |
logger.log('%s: Added Message:\n%j', formatDate(new Date()), packet); | |
addMessageToQueue(validMessage); | |
validMessages++; | |
}); | |
} | |
/* === | |
Graphdat can accept metrics as a single metric or as a bulk format, by default | |
we send metrics to the bulk API in the following format | |
curl https://api.graphdat.com/v1/measurements \ | |
-u <email>:<api-token> \ | |
-X POST \ | |
-H "Content-Type: application/json" \ | |
-d ' | |
[ | |
[ | |
"myserver", | |
"PAGE_FAULTS", | |
2, | |
1377043134 | |
], | |
[ | |
"myserver", | |
"CACHE_MEMORY_USED", | |
0.7, | |
1377043134 | |
] | |
]' | |
=== */ | |
function sendToGraphdat(messageList, callback) | |
{ | |
var bodyString = JSON.stringify(messageList); | |
var options = { | |
method: 'POST', | |
host: 'api.graphdat.com', | |
port: 443, | |
path: '/v1/measurements', | |
auth: CONFIG_EMAIL + ':' + CONFIG_APITOKEN, | |
headers: { | |
'Content-Length': bodyString.length, | |
'Content-Type': 'application/json' | |
} | |
}; | |
try | |
{ | |
var req = _https.request(options, function(resp) { | |
resp.on('end', function () | |
{ | |
callback(null); | |
}); | |
resp.on('error', function(err) | |
{ | |
callback(err); | |
}); | |
}); | |
req.on('error', function(err) | |
{ | |
callback(err); | |
}); | |
req.write(bodyString); | |
req.end(); | |
} | |
catch(ex) | |
{ | |
callback(ex); | |
} | |
} | |
/* === | |
Flush the message queue and send the messages to Graphdat on an interval and | |
reset the metrics | |
=== */ | |
function flushMessages() | |
{ | |
if (messageQueue.length > 0) | |
{ | |
var list = messageQueue; | |
messageQueue = []; | |
hash = {}; | |
sendToGraphdat(list, function(err) | |
{ | |
if (err) | |
{ | |
logger.error('%s: ERR sending message: %s', formatDate(new Date()), err); | |
} | |
else | |
{ | |
sentMessages += list.length; | |
if (DEBUG_LOG_SENT_MESSAGE_QUEUE) | |
logger.log('%s: Sent Messages:\n%j', formatDate(new Date()), JSON.stringify(list)); | |
} | |
if (DEBUG_LOG_STATUS) | |
{ | |
logger.log('%s: Queue Flushed: %d invalid messages, %d valid messages, %d messages sent to Graphdat', formatDate(new Date()), invalidMessages, validMessages, sentMessages); | |
} | |
list = null; | |
invalidMessages = 0; | |
sentMessages = 0; | |
validMessages = 0; | |
setTimeout(flushMessages, FLUSH_TIME); | |
}); | |
} | |
else | |
{ | |
if (DEBUG_LOG_STATUS) | |
{ | |
logger.log('%s: Queue Flushed: %d invalid messages, %d valid messages, %d sent messages', formatDate(new Date()), invalidMessages, validMessages, sentMessages); | |
} | |
invalidMessages = 0; | |
sentMessages = 0; | |
validMessages = 0; | |
setTimeout(flushMessages, FLUSH_TIME); | |
} | |
} | |
// start the UDP server | |
var udpServer = _dgram.createSocket('udp4', process); | |
udpServer.bind(SERVER_PORT); | |
// start the message queue | |
flushMessages(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment