Created
September 8, 2011 20:33
-
-
Save apg/1204588 to your computer and use it in GitHub Desktop.
statsd multiple flushers... on top of other unreleased stuff...
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* statsd_domload_20120: multiple flush intervals a la graphite itself | |
--- | |
diff --git a/util/conf/production/statsd-config.js b/util/conf/production/statsd-config.js | |
index d855abe..197736a 100644 | |
--- a/util/conf/production/statsd-config.js | |
+++ b/util/conf/production/statsd-config.js | |
@@ -4,5 +4,16 @@ | |
, port: 8125 | |
, statusPort: 8126 | |
, statusAddr: "0.0.0.0" | |
+, flushBuckets: [ | |
+ { | |
+ pattern: "^pagetime\.domload.*" | |
+ , flushInterval: 60000 | |
+ , statPrefix: "pagetime.domload" | |
+ } | |
+ , { | |
+ pattern: "^.*" | |
+ , flushInterval: 10000 | |
+ , statPrefix: "stats" | |
+ } | |
+ ] | |
} | |
- | |
diff --git a/util/init/production/statsd.sh b/util/init/production/statsd.sh | |
index 4d99eb3..5e02985 100755 | |
--- a/util/init/production/statsd.sh | |
+++ b/util/init/production/statsd.sh | |
@@ -34,7 +34,7 @@ case "$mode" in | |
nohup sh -c "exec $NODE_JS $COMMAND $CONFIG_FILE >> $LOGFILE 2>&1" > /dev/null & | |
echo "$!" > "$LOCKFILE" | |
- echo "started feedd with pid="`cat $LOCKFILE` | |
+ echo "started statsd with pid="`cat $LOCKFILE` | |
;; | |
'stop') | |
diff --git a/util/node/statsd/stats.js b/util/node/statsd/stats.js | |
index 2838926..dfe31bc 100644 | |
--- a/util/node/statsd/stats.js | |
+++ b/util/node/statsd/stats.js | |
@@ -4,15 +4,42 @@ var dgram = require('dgram') | |
, config = require('./config') | |
, http = require('http'); | |
-var counters = {}; | |
-var timers = {}; | |
-var debugInt, flushInt, server, sserver; | |
+var counters = []; | |
+var timers = []; | |
+var debugInt, flushInts = [], server, sserver; | |
+ | |
+ | |
+String.prototype.startsWith = function(str) { | |
+ var l = str.length; | |
+ return this.substring(0, l) == str; | |
+}; | |
+ | |
+String.prototype.endsWith = function(str) { | |
+ var l = str.length; | |
+ return this.substring(this.length - l) == str; | |
+}; | |
+ | |
+String.prototype.removePrefix = function(str) { | |
+ if (this.startsWith(str)) { | |
+ return this.substring(str.length); | |
+ } | |
+ return this; | |
+} | |
config.configFile(process.argv[2], function (config, oldConfig) { | |
if (! config.debug && debugInt) { | |
clearInterval(debugInt); | |
debugInt = false; | |
} | |
+ | |
+ if (! config.flushBuckets) { | |
+ config.flushBuckets = [ | |
+ { | |
+ pattern: ".*" | |
+ , flushInterval: 10000 | |
+ } | |
+ ]; | |
+ } | |
if (config.debug) { | |
if (debugInt !== undefined) { clearInterval(debugInt); } | |
@@ -20,7 +47,8 @@ config.configFile(process.argv[2], function (config, oldConfig) { | |
sys.log("Counters:\n" + sys.inspect(counters) + "\nTimers:\n" + sys.inspect(timers)); | |
}, config.debugInterval || 10000); | |
} | |
- | |
+ | |
+ // status server | |
if (sserver === undefined) { | |
sserver = http.createServer(function (req, res) { | |
if (req.url.substring(0, 5) === "/ping") { | |
@@ -28,16 +56,25 @@ config.configFile(process.argv[2], function (config, oldConfig) { | |
res.end('PONG\n'); | |
} | |
else { | |
- res.writeHead(200, {'Content-Type': 'text/plain'}); | |
- res.end('' + counters.length + ' Counters, ' + timers.length + ' Timers\n'); | |
+ res.writeHead(200, {'Content-Type': 'application/json'}); | |
+ var status = { | |
+ 'timers': timers, | |
+ 'counters': counters | |
+ }; | |
+ res.end(sys.inspect(status)); | |
} | |
}); | |
sserver.listen(config.statusPort, config.statusAddr || "127.0.0.1"); | |
- | |
} | |
if (server === undefined) { | |
+ // create the data buckets | |
+ for (var i = 0; i < config.flushBuckets.length; i++) { | |
+ counters.push({}); | |
+ timers.push({}); | |
+ } | |
+ | |
server = dgram.createSocket('udp4', function (msg, rinfo) { | |
if (config.dumpMessages) { sys.log(msg.toString()); } | |
var bits = msg.toString().split(':'); | |
@@ -45,6 +82,20 @@ config.configFile(process.argv[2], function (config, oldConfig) { | |
.replace(/\s+/g, '_') | |
.replace(/\//g, '-') | |
.replace(/[^a-zA-Z_\-0-9\.]/g, ''); | |
+ var flushBucket = -1; | |
+ | |
+ // find the appropriate bucket | |
+ for (var i = 0; i < config.flushBuckets.length; i++) { | |
+ if (key.match(config.flushBuckets[i].pattern || ".*")) { | |
+ flushBucket = i; | |
+ break; | |
+ } | |
+ } | |
+ | |
+ if (flushBucket < 0) { | |
+ sys.log("key " + key + "didn't match expected pattern\n"); | |
+ return; | |
+ } | |
if (bits.length == 0) { | |
sys.log("invalid data sent: " + msg + "\n"); | |
@@ -55,22 +106,22 @@ config.configFile(process.argv[2], function (config, oldConfig) { | |
var sampleRate = 1; | |
var fields = bits[i].split("|"); | |
if (fields[1] && fields[1].trim() == "ms") { | |
- if (! timers[key]) { | |
- timers[key] = []; | |
+ if (! timers[flushBucket][key]) { | |
+ timers[flushBucket][key] = []; | |
} | |
- timers[key].push(Number(fields[0] || 0)); | |
+ timers[flushBucket][key].push(Number(fields[0] || 0)); | |
} else { | |
if (fields[2] && fields[2].match(/^@([\d\.]+)/)) { | |
sampleRate = Number(fields[2].match(/^@([\d\.]+)/)[1]); | |
} | |
if (! counters[key]) { | |
- counters[key] = 0; | |
+ counters[flushBucket][key] = 0; | |
} | |
if (fields[0].match(/^[+-]?[\d\.]+/)) { | |
- counters[key] += Number(fields[0] || 1) * (1 / sampleRate); | |
+ counters[flushBucket][key] += Number(fields[0] || 1) * (1 / sampleRate); | |
} | |
else { | |
- counters[key] = 0; | |
+ counters[flushBucket][key] = 0; | |
} | |
} | |
} | |
@@ -78,73 +129,90 @@ config.configFile(process.argv[2], function (config, oldConfig) { | |
server.bind(config.port || 8125); | |
- var flushInterval = Number(config.flushInterval || 10000); | |
+ var mkFlusher = function (flushBucket, flushInterval, statPrefix) { | |
+ return function() { | |
+ var statString = ''; | |
+ var ts = Math.round(new Date().getTime() / 1000); | |
+ var numStats = 0; | |
+ var key, tmpKey; | |
- flushInt = setInterval(function () { | |
- var statString = ''; | |
- var ts = Math.round(new Date().getTime() / 1000); | |
- var numStats = 0; | |
- var key; | |
+ for (key in counters[flushBucket]) { | |
+ var value = counters[flushBucket][key] / (flushInterval / 1000); | |
- for (key in counters) { | |
- var value = counters[key] / (flushInterval / 1000); | |
- var message = 'stats.' + key + ' ' + value + ' ' + ts + "\n"; | |
- statString += message; | |
- counters[key] = 0; | |
- numStats += 1; | |
- } | |
+ tmpKey = key.removePrefix(statPrefix).removePrefix('.'); | |
- for (key in timers) { | |
- if (timers[key].length > 0) { | |
- var pctThreshold = config.percentThreshold || 90; | |
- var values = timers[key].sort(function (a,b) { return a-b; }); | |
- var count = values.length; | |
- var min = values[0]; | |
- var max = values[count - 1]; | |
- | |
- var mean = min; | |
- var maxAtThreshold = max; | |
- | |
- if (count > 1) { | |
- var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count); | |
- var numInThreshold = count - thresholdIndex; | |
- values = values.slice(0, numInThreshold); | |
- maxAtThreshold = values[numInThreshold - 1]; | |
- | |
- // average the remaining timings | |
- var sum = 0; | |
- for (var i = 0; i < numInThreshold; i++) { | |
- sum += values[i]; | |
+ statString += statPrefix + '.' + tmpKey + ' ' + value + ' ' + ts + "\n"; | |
+ counters[flushBucket][key] = 0; | |
+ numStats += 1; | |
+ } | |
+ | |
+ for (key in timers[flushBucket]) { | |
+ if (timers[flushBucket][key].length > 0) { | |
+ var pctThreshold = config.percentThreshold || 90; | |
+ var values = timers[flushBucket][key].sort(function (a,b) { return a-b; }); | |
+ var count = values.length; | |
+ var min = values[0]; | |
+ var max = values[count - 1]; | |
+ | |
+ var mean = min; | |
+ var maxAtThreshold = max; | |
+ | |
+ if (count > 1) { | |
+ var thresholdIndex = Math.round(((100 - pctThreshold) / 100) * count); | |
+ var numInThreshold = count - thresholdIndex; | |
+ values = values.slice(0, numInThreshold); | |
+ maxAtThreshold = values[numInThreshold - 1]; | |
+ | |
+ // average the remaining timings | |
+ var sum = 0; | |
+ for (var i = 0; i < numInThreshold; i++) { | |
+ sum += values[i]; | |
+ } | |
+ | |
+ mean = sum / numInThreshold; | |
} | |
- mean = sum / numInThreshold; | |
- } | |
+ timers[flushBucket][key] = []; | |
- timers[key] = []; | |
+ var message = ""; | |
- var message = ""; | |
- message += 'stats.timers.' + key + '.mean ' + mean + ' ' + ts + "\n"; | |
- message += 'stats.timers.' + key + '.upper ' + max + ' ' + ts + "\n"; | |
- message += 'stats.timers.' + key + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n"; | |
- message += 'stats.timers.' + key + '.lower ' + min + ' ' + ts + "\n"; | |
- message += 'stats.timers.' + key + '.count ' + count + ' ' + ts + "\n"; | |
- statString += message; | |
+ tmpKey = key.removePrefix(statPrefix).removePrefix('.'); | |
- numStats += 1; | |
- } | |
- } | |
+ message += statPrefix + '.timers.' + tmpKey + '.mean ' + mean + ' ' + ts + "\n"; | |
+ message += statPrefix + '.timers.' + tmpKey + '.upper ' + max + ' ' + ts + "\n"; | |
+ message += statPrefix + '.timers.' + tmpKey + '.upper_' + pctThreshold + ' ' + maxAtThreshold + ' ' + ts + "\n"; | |
+ message += statPrefix + '.timers.' + tmpKey + '.lower ' + min + ' ' + ts + "\n"; | |
+ message += statPrefix + '.timers.' + tmpKey + '.count ' + count + ' ' + ts + "\n"; | |
- statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; | |
- | |
- var graphite = net.createConnection(config.graphitePort, config.graphiteHost); | |
+ statString += message; | |
- graphite.on('connect', function() { | |
- this.write(statString); | |
- this.end(); | |
- }); | |
+ numStats += 1; | |
+ } | |
+ } | |
- }, flushInterval); | |
+ statString += 'statsd.numStats ' + numStats + ' ' + ts + "\n"; | |
+ | |
+ var graphite = net.createConnection(config.graphitePort, config.graphiteHost); | |
+ | |
+ graphite.on('connect', function() { | |
+ this.write(statString); | |
+ this.end(); | |
+ }); | |
+ }; | |
+ }; | |
+ | |
+ // setup flushers | |
+ for (var i = 0; i < config.flushBuckets.length; i++) { | |
+ var flushInterval = new Number(config.flushBuckets[i].flushInterval || 10000); | |
+ flushInts.push(setInterval(mkFlusher(i | |
+ , flushInterval | |
+ , config.flushBuckets[i].statPrefix) | |
+ , flushInterval)); | |
+ } | |
} | |
- | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment