Skip to content

Instantly share code, notes, and snippets.

@apg
Created September 8, 2011 20:33
Show Gist options
  • Save apg/1204588 to your computer and use it in GitHub Desktop.
Save apg/1204588 to your computer and use it in GitHub Desktop.
statsd multiple flushers... on top of other unreleased stuff...
* statsd_domload_20120: multiple flush intervals a la graphite itself
---
diff --git a/util/conf/production/statsd-config.js b/util/conf/production/statsd-config.js
index d855abe..197736a 100644
--- a/util/conf/production/statsd-config.js
+++ b/util/conf/production/statsd-config.js
@@ -4,5 +4,16 @@
, port: 8125
, statusPort: 8126
, statusAddr: "0.0.0.0"
+, flushBuckets: [
+ {
+ pattern: "^pagetime\.domload.*"
+ , flushInterval: 60000
+ , statPrefix: "pagetime.domload"
+ }
+ , {
+ pattern: "^.*"
+ , flushInterval: 10000
+ , statPrefix: "stats"
+ }
+ ]
}
-
diff --git a/util/init/production/statsd.sh b/util/init/production/statsd.sh
index 4d99eb3..5e02985 100755
--- a/util/init/production/statsd.sh
+++ b/util/init/production/statsd.sh
@@ -34,7 +34,7 @@ case "$mode" in
nohup sh -c "exec $NODE_JS $COMMAND $CONFIG_FILE >> $LOGFILE 2>&1" > /dev/null &
echo "$!" > "$LOCKFILE"
- echo "started feedd with pid="`cat $LOCKFILE`
+ echo "started statsd with pid="`cat $LOCKFILE`
;;
'stop')
diff --git a/util/node/statsd/stats.js b/util/node/statsd/stats.js
index 2838926..dfe31bc 100644
--- a/util/node/statsd/stats.js
+++ b/util/node/statsd/stats.js
@@ -4,15 +4,42 @@ var dgram = require('dgram')
, config = require('./config')
, http = require('http');
-var counters = {};
-var timers = {};
-var debugInt, flushInt, server, sserver;
+var counters = [];
+var timers = [];
+var debugInt, flushInts = [], server, sserver;
+
+
+String.prototype.startsWith = function(str) {
+ var l = str.length;
+ return this.substring(0, l) == str;
+};
+
+String.prototype.endsWith = function(str) {
+ var l = str.length;
+ return this.substring(this.length - l) == str;
+};
+
+String.prototype.removePrefix = function(str) {
+ if (this.startsWith(str)) {
+ return this.substring(str.length);
+ }
+ return this;
+}
config.configFile(process.argv[2], function (config, oldConfig) {
if (! config.debug && debugInt) {
clearInterval(debugInt);
debugInt = false;
}
+
+ if (! config.flushBuckets) {
+ config.flushBuckets = [
+ {
+ pattern: ".*"
+ , flushInterval: 10000
+ }
+ ];
+ }
if (config.debug) {
if (debugInt !== undefined) { clearInterval(debugInt); }
@@ -20,7 +47,8 @@ config.configFile(process.argv[2], function (config, oldConfig) {
sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers));
}, config.debugInterval || 10000);
}
-
+
+ // status server
if (sserver === undefined) {
sserver = http.createServer(function (req, res) {
if (req.url.substring(0, 5) === "/ping") {
@@ -28,16 +56,25 @@ config.configFile(process.argv[2], function (config, oldConfig) {
res.end('PONG\n');
}
else {
- res.writeHead(200, {'Content-Type': 'text/plain'});
- res.end('' + counters.length + ' Counters, ' + timers.length + ' Timers\n');
+ res.writeHead(200, {'Content-Type': 'application/json'});
+ var status = {
+ 'timers': timers,
+ 'counters': counters
+ };
+ res.end(sys.inspect(status));
}
});
sserver.listen(config.statusPort, config.statusAddr || "127.0.0.1");
-
}
if (server === undefined) {
+ // create the data buckets
+ for (var i = 0; i < config.flushBuckets.length; i++) {
+ counters.push({});
+ timers.push({});
+ }
+
server = dgram.createSocket('udp4', function (msg, rinfo) {
if (config.dumpMessages) { sys.log(msg.toString()); }
var bits = msg.toString().split(':');
@@ -45,6 +82,20 @@ config.configFile(process.argv[2], function (config, oldConfig) {
.replace(/\s+/g, '_')
.replace(/\//g, '-')
.replace(/[^a-zA-Z_\-0-9\.]/g, '');
+ var flushBucket = -1;
+
+ // find the appropriate bucket
+ for (var i = 0; i < config.flushBuckets.length; i++) {
+ if (key.match(config.flushBuckets[i].pattern || ".*")) {
+ flushBucket = i;
+ break;
+ }
+ }
+
+ if (flushBucket < 0) {
+ sys.log("key " + key + "didn't match expected pattern\n");
+ return;
+ }
if (bits.length == 0) {
sys.log("invalid data sent: " + msg + "\n");
@@ -55,22 +106,22 @@ config.configFile(process.argv[2], function (config, oldConfig) {
var sampleRate = 1;
var fields = bits[i].split("|");
if (fields[1] && fields[1].trim() == "ms") {
- if (! timers[key]) {
- timers[key] = [];
+ if (! timers[flushBucket][key]) {
+ timers[flushBucket][key] = [];
}
- timers[key].push(Number(fields[0] || 0));
+ timers[flushBucket][key].push(Number(fields[0] || 0));
} else {
if (fields[2] && fields[2].match(/^@([\d\.]+)/)) {
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]);
}
if (! counters[key]) {
- counters[key] = 0;
+ counters[flushBucket][key] = 0;
}
if (fields[0].match(/^[+-]?[\d\.]+/)) {
- counters[key] += Number(fields[0] || 1) * (1 / sampleRate);
+ counters[flushBucket][key] += Number(fields[0] || 1) * (1 / sampleRate);
}
else {
- counters[key] = 0;
+ counters[flushBucket][key] = 0;
}
}
}
@@ -78,73 +129,90 @@ config.configFile(process.argv[2], function (config, oldConfig) {
server.bind(config.port || 8125);
- var flushInterval = Number(config.flushInterval || 10000);
+ var mkFlusher = function (flushBucket, flushInterval, statPrefix) {
+ return function() {
+ var statString = '';
+ var ts = Math.round(new Date().getTime() / 1000);
+ var numStats = 0;
+ var key, tmpKey;
- flushInt = setInterval(function () {
- var statString = '';
- var ts = Math.round(new Date().getTime() / 1000);
- var numStats = 0;
- var key;
+ for (key in counters[flushBucket]) {
+ var value = counters[flushBucket][key] / (flushInterval / 1000);
- for (key in counters) {
- var value = counters[key] / (flushInterval / 1000);
- var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n";
- statString += message;
- counters[key] = 0;
- numStats += 1;
- }
+ tmpKey = key.removePrefix(statPrefix).removePrefix('.');
- for (key in timers) {
- if (timers[key].length > 0) {
- var pctThreshold = config.percentThreshold || 90;
- var values = timers[key].sort(function (a,b) { return a-b; });
- var count = values.length;
- var min = values[0];
- var max = values[count - 1];
-
- var mean = min;
- var maxAtThreshold = max;
-
- if (count > 1) {
- var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
- var numInThreshold = count - thresholdIndex;
- values = values.slice(0, numInThreshold);
- maxAtThreshold = values[numInThreshold - 1];
-
- // average the remaining timings
- var sum = 0;
- for (var i = 0; i < numInThreshold; i++) {
- sum += values[i];
+ statString += statPrefix + '.' + tmpKey + ' ' + value + ' ' + ts + "\n";
+ counters[flushBucket][key] = 0;
+ numStats += 1;
+ }
+
+ for (key in timers[flushBucket]) {
+ if (timers[flushBucket][key].length > 0) {
+ var pctThreshold = config.percentThreshold || 90;
+ var values = timers[flushBucket][key].sort(function (a,b) { return a-b; });
+ var count = values.length;
+ var min = values[0];
+ var max = values[count - 1];
+
+ var mean = min;
+ var maxAtThreshold = max;
+
+ if (count > 1) {
+ var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count);
+ var numInThreshold = count - thresholdIndex;
+ values = values.slice(0, numInThreshold);
+ maxAtThreshold = values[numInThreshold - 1];
+
+ // average the remaining timings
+ var sum = 0;
+ for (var i = 0; i < numInThreshold; i++) {
+ sum += values[i];
+ }
+
+ mean = sum / numInThreshold;
}
- mean = sum / numInThreshold;
- }
+ timers[flushBucket][key] = [];
- timers[key] = [];
+ var message = "";
- var message = "";
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n";
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n";
- statString += message;
+ tmpKey = key.removePrefix(statPrefix).removePrefix('.');
- numStats += 1;
- }
- }
+ message += statPrefix + '.timers.' + tmpKey + '.mean ' + mean + ' ' + ts + "\n";
+ message += statPrefix + '.timers.' + tmpKey + '.upper ' + max + ' ' + ts + "\n";
+ message += statPrefix + '.timers.' + tmpKey + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n";
+ message += statPrefix + '.timers.' + tmpKey + '.lower ' + min + ' ' + ts + "\n";
+ message += statPrefix + '.timers.' + tmpKey + '.count ' + count + ' ' + ts + "\n";
- statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
-
- var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+ statString += message;
- graphite.on('connect', function() {
- this.write(statString);
- this.end();
- });
+ numStats += 1;
+ }
+ }
- }, flushInterval);
+ statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n";
+
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost);
+
+ graphite.on('connect', function() {
+ this.write(statString);
+ this.end();
+ });
+ };
+ };
+
+ // setup flushers
+ for (var i = 0; i < config.flushBuckets.length; i++) {
+ var flushInterval = new Number(config.flushBuckets[i].flushInterval || 10000);
+ flushInts.push(setInterval(mkFlusher(i
+ , flushInterval
+ , config.flushBuckets[i].statPrefix)
+ , flushInterval));
+ }
}
-
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment