Skip to content

Instantly share code, notes, and snippets.

@alq666
Created June 11, 2012 20:15
Show Gist options
  • Save alq666/2912389 to your computer and use it in GitHub Desktop.
Save alq666/2912389 to your computer and use it in GitHub Desktop.
dd-agent 2.2.24-2.2.25 diff
diff --git a/checks/common.py b/checks/common.py
index 6fa60de..70ce0c8 100644
--- a/checks/common.py
+++ b/checks/common.py
@@ -82,7 +82,7 @@ class checks:
self._io = IO()
self._load = Load(self.checksLogger)
self._memory = Memory(self.checksLogger)
- self._network = Network()
+ self._network = Network(self.checksLogger)
self._processes = Processes()
self._cpu = Cpu()
self._couchdb = CouchDb(self.checksLogger)
@@ -155,7 +155,7 @@ class checks:
@recordsize
def getNetworkTraffic(self):
- return self._network.check(self.checksLogger, self.agentConfig)
+ return self._network.check(self.agentConfig)
@recordsize
def getNginxStatus(self):
diff --git a/checks/ec2.py b/checks/ec2.py
index 3907244..0ac7412 100644
--- a/checks/ec2.py
+++ b/checks/ec2.py
@@ -8,7 +8,7 @@ from checks import Check
class EC2(Check):
"""Retrieve EC2 metadata
"""
- URL = "http://169.254.169.254/latest/meta-data/"
+ URL = "http://169.254.169.254/latest/meta-data"
TIMEOUT = 0.1 # second
def __init__(self, logger):
@@ -19,11 +19,11 @@ class EC2(Check):
"""Use the ec2 http service to introspect the instance. This adds latency if not running on EC2
"""
# >>> import urllib2
- # >>> urllib2.urlopen('http://169.254.169.254/1.0/', timeout=1).read()
+ # >>> urllib2.urlopen('http://169.254.169.254/latest/', timeout=1).read()
# 'meta-data\nuser-data'
- # >>> urllib2.urlopen('http://169.254.169.254/1.0/meta-data', timeout=1).read()
+ # >>> urllib2.urlopen('http://169.254.169.254/latest/meta-data', timeout=1).read()
# 'ami-id\nami-launch-index\nami-manifest-path\nhostname\ninstance-id\nlocal-ipv4\npublic-keys/\nreservation-id\nsecurity-groups'
- # >>> urllib2.urlopen('http://169.254.169.254/1.0/meta-data/instance-id', timeout=1).read()
+ # >>> urllib2.urlopen('http://169.254.169.254/latest/meta-data/instance-id', timeout=1).read()
# 'i-deadbeef'
metadata = {}
@@ -37,7 +37,7 @@ class EC2(Check):
except:
pass
- for k in ('instance-id', 'hostname', 'ami-id', 'local-ipv4', 'public-keys', 'reservation-id', 'security-groups'):
+ for k in ('instance-id', 'hostname', 'local-hostname', 'public-hostname', 'ami-id', 'local-ipv4', 'public-keys', 'public-ipv4', 'reservation-id', 'security-groups'):
try:
v = urllib2.urlopen(EC2.URL + "/" + unicode(k)).read().strip()
assert type(v) in (types.StringType, types.UnicodeType) and len(v) > 0, "%s is not a string" % v
@@ -45,6 +45,20 @@ class EC2(Check):
except:
pass
+ # Get fqdn, make sure that hostname only contains local part
+ try:
+ hname = metadata.get("hostname", None)
+ if hname is None:
+ hname = socket.gethostname()
+ metadata["fqdn"] = socket.getfqdn()
+ else:
+ metadata["fqdn"] = metadata["hostname"]
+
+ # Replace hostname with shortname
+ metadata["hostname"] = hname.split(".")[0]
+ except:
+ pass
+
try:
if socket_to is None:
socket_to = 3
diff --git a/checks/system.py b/checks/system.py
index 59cd2e3..7bf4d0e 100644
--- a/checks/system.py
+++ b/checks/system.py
@@ -5,7 +5,7 @@ import string
import subprocess
import sys
import time
-from checks import Check, gethostname
+from checks import Check, gethostname, UnknownValue
class Disk(Check):
@@ -339,13 +339,30 @@ class Memory(Check):
else:
return False
-class Network(object):
- def __init__(self):
+class Network(Check):
+ def __init__(self, logger):
+ Check.__init__(self, logger)
+
self.networkTrafficStore = {}
self.networkTrafficStore["last_ts"] = time.time()
self.networkTrafficStore["current_ts"] = self.networkTrafficStore["last_ts"]
-
- def check(self, logger, agentConfig):
+
+ def _parse_value(self, v):
+ if v == "-":
+ return 0
+ else:
+ try:
+ return long(v)
+ except ValueError:
+ return 0
+
+ def check(self, agentConfig):
+ """Report network traffic in bytes by interface
+
+ @rtype dict
+ @return {"en0": {"recv_bytes": 123, "trans_bytes": 234}, ...}
+ """
+ # FIXME rework linux support to use the built-in Check logic
if sys.platform == 'linux2':
try:
proc = open('/proc/net/dev', 'r')
@@ -353,7 +370,7 @@ class Network(object):
self.networkTrafficStore["current_ts"] = time.time()
except:
- logger.exception('getNetworkTraffic')
+ self.logger.exception('getNetworkTraffic')
return False
proc.close()
@@ -372,13 +389,12 @@ class Network(object):
faceData = dict(zip(cols, data.split()))
faces[face] = faceData
-
interfaces = {}
interval = self.networkTrafficStore["current_ts"] - self.networkTrafficStore["last_ts"]
- logger.debug('getNetworkTraffic: interval (s) %s' % interval)
+ self.logger.debug('getNetworkTraffic: interval (s) %s' % interval)
if interval == 0:
- logger.warn('0-sample interval, skipping network checks')
+ self.logger.warn('0-sample interval, skipping network checks')
return False
self.networkTrafficStore["last_ts"] = self.networkTrafficStore["current_ts"]
@@ -407,9 +423,89 @@ class Network(object):
return interfaces
- else:
- logger.debug('getNetworkTraffic: other platform, returning')
+ elif sys.platform == "darwin":
+ try:
+ netstat = subprocess.Popen(["netstat", "-i", "-b"],
+ stdout=subprocess.PIPE,
+ close_fds=True)
+ # Name Mtu Network Address Ipkts Ierrs Ibytes Opkts Oerrs Obytes Coll
+ # lo0 16384 <Link#1> 318258 0 428252203 318258 0 428252203 0
+ # lo0 16384 localhost fe80:1::1 318258 - 428252203 318258 - 428252203 -
+ # lo0 16384 127 localhost 318258 - 428252203 318258 - 428252203 -
+ # lo0 16384 localhost ::1 318258 - 428252203 318258 - 428252203 -
+ # gif0* 1280 <Link#2> 0 0 0 0 0 0 0
+ # stf0* 1280 <Link#3> 0 0 0 0 0 0 0
+ # en0 1500 <Link#4> 04:0c:ce:db:4e:fa 20801309 0 13835457425 15149389 0 11508790198 0
+ # en0 1500 seneca.loca fe80:4::60c:ceff: 20801309 - 13835457425 15149389 - 11508790198 -
+ # en0 1500 2001:470:1f 2001:470:1f07:11d 20801309 - 13835457425 15149389 - 11508790198 -
+ # en0 1500 2001:470:1f 2001:470:1f07:11d 20801309 - 13835457425 15149389 - 11508790198 -
+ # en0 1500 192.168.1 192.168.1.63 20801309 - 13835457425 15149389 - 11508790198 -
+ # en0 1500 2001:470:1f 2001:470:1f07:11d 20801309 - 13835457425 15149389 - 11508790198 -
+ # p2p0 2304 <Link#5> 06:0c:ce:db:4e:fa 0 0 0 0 0 0 0
+ # ham0 1404 <Link#6> 7a:79:05:4d:bf:f5 30100 0 6815204 18742 0 8494811 0
+ # ham0 1404 5 5.77.191.245 30100 - 6815204 18742 - 8494811 -
+ # ham0 1404 seneca.loca fe80:6::7879:5ff: 30100 - 6815204 18742 - 8494811 -
+ # ham0 1404 2620:9b::54 2620:9b::54d:bff5 30100 - 6815204 18742 - 8494811 -
+ out, err = netstat.communicate()
+ lines = out.split("\n")
+ headers = lines[0].split()
+ # Given the irregular structure of the table above, better to parse from the end of each line
+ # Verify headers first
+ # -7 -6 -5 -4 -3 -2 -1
+ for h in ("Ipkts", "Ierrs", "Ibytes", "Opkts", "Oerrs", "Obytes", "Coll"):
+ if h not in headers:
+ self.logger.error("%s not found in %s; cannot parse" % (h, headers))
+ return False
+ current = None
+ for l in lines[1:]:
+ x = l.split()
+ if len(x) == 0:
+ break
+ iface = x[0]
+ if iface.endswith("*"):
+ iface = iface[:-1]
+ if iface == current:
+ # skip multiple lines of same interface
+ continue
+ else:
+ current = iface
+
+ if not self.is_counter("%s.recv_bytes" % iface):
+ self.counter("%s.recv_bytes" % iface)
+ value = self._parse_value(x[-5])
+ self.save_sample("%s.recv_bytes" % iface, value)
+
+ if not self.is_counter("%s.trans_bytes" % iface):
+ self.counter("%s.trans_bytes" % iface)
+ value = self._parse_value(x[-2])
+ self.save_sample("%s.trans_bytes" % iface, value)
+
+ # now make a dictionary {"iface": {"recv_bytes": value, "trans_bytes": value}}
+ interfaces = {}
+ for m in self.get_metric_names():
+ # m should be a counter
+ if not self.is_counter(m):
+ continue
+ # metric name iface.recv|trans_bytes
+ i, n = m.split(".")
+ try:
+ sample = self.get_sample(m)
+ # will raise if no value, thus skipping what's next
+ if interfaces.get(i) is None:
+ interfaces[i] = {}
+ interfaces[i][n] = sample
+ except UnknownValue:
+ pass
+ if len(interfaces) > 0:
+ return interfaces
+ else:
+ return False
+ except:
+ self.logger.exception('getNetworkTraffic')
+ return False
+ else:
+ self.logger.debug("getNetworkTraffic: unsupported platform")
return False
class Processes(object):
diff --git a/checks/utils.py b/checks/utils.py
index 886b974..93a82df 100644
--- a/checks/utils.py
+++ b/checks/utils.py
@@ -78,7 +78,7 @@ class TailFile(object):
self._f = open(self._path,'r')
if move_end:
self._log.debug("Opening file %s" % (self._path))
- self._f.seek(1, SEEK_END)
+ self._f.seek(0, SEEK_END)
elif pos:
self._log.debug("Reopening file %s at %s" % (self._path, pos))
self._f.seek(pos)
diff --git a/config.py b/config.py
index aa64d09..5713799 100644
--- a/config.py
+++ b/config.py
@@ -17,7 +17,7 @@ def get_parsed_args():
dest='dd_url')
parser.add_option('-c', '--clean', action='store_true', default=False,
dest='clean')
- parser.add_option('-u', '--use-local-forwarder', action='store_true',
+ parser.add_option('-u', '--use-local-forwarder', action='store_true',
default=False,dest='use_forwarder')
try:
options, args = parser.parse_args()
@@ -85,11 +85,11 @@ def get_config(parse_args = True, cfg_path=None):
# Which API key to use
agentConfig['apiKey'] = config.get('Main', 'api_key')
-
+
# Debug mode
agentConfig['debugMode'] = config.get('Main', 'debug_mode').lower() in ("yes", "true")
- if config.has_option('Main', 'use_ec2_instance_id'):
+ if config.has_option('Main', 'use_ec2_instance_id'):
use_ec2_instance_id = config.get('Main', 'use_ec2_instance_id')
# translate yes into True, the rest into False
agentConfig['useEC2InstanceId'] = (use_ec2_instance_id.lower() == 'yes')
@@ -130,12 +130,23 @@ def get_config(parse_args = True, cfg_path=None):
else:
agentConfig['graphite_listen_port'] = None
+ # Dogstatsd config
+ dogstatsd_defaults = {
+ 'dogstatsd_port' : 8125,
+ 'dogstatsd_target' : 'http://localhost:17123',
+ 'dogstatsd_interval' : 10
+ }
+ for key, value in dogstatsd_defaults.iteritems():
+ if config.has_option('Main', key):
+ agentConfig[key] = config.get('Main', key)
+ else:
+ agentConfig[key] = value
# Optional config
# FIXME not the prettiest code ever...
if config.has_option('Main', 'use_mount'):
agentConfig['use_mount'] = config.get('Main', 'use_mount').lower() in ("yes", "true", "1")
-
+
if config.has_option('Main', 'apache_status_url'):
agentConfig['apacheStatusUrl'] = config.get('Main', 'apache_status_url')
@@ -244,7 +255,7 @@ def get_config(parse_args = True, cfg_path=None):
agentConfig["memcache_server"] = config.get("Main", "memcache_server")
if config.has_option("Main", "memcache_port"):
agentConfig["memcache_port"] = config.get("Main", "memcache_port")
-
+
# Dogstream config
if config.has_option("Main", "dogstream_log"):
# Older version, single log support
@@ -253,10 +264,10 @@ def get_config(parse_args = True, cfg_path=None):
agentConfig["dogstreams"] = ':'.join([log_path, config.get("Main", "dogstream_line_parser")])
else:
agentConfig["dogstreams"] = log_path
-
+
elif config.has_option("Main", "dogstreams"):
agentConfig["dogstreams"] = config.get("Main", "dogstreams")
-
+
if config.has_option("Main", "nagios_perf_cfg"):
agentConfig["nagiosPerfCfg"] = config.get("Main", "nagios_perf_cfg")
diff --git a/datadog.conf.example b/datadog.conf.example
index 58124db..96c5190 100644
--- a/datadog.conf.example
+++ b/datadog.conf.example
@@ -1,16 +1,16 @@
[Main]
-# The host of the Datadog intake server to send agent data to
+# The host of the Datadog intake server to send agent data to
dd_url: https://app.datadoghq.com
-# The Datadog api key to associate your agent's data with your organization.
-# Can be found here:
+# The Datadog api key to associate your agent's data with your organization.
+# Can be found here:
# https://app.datadoghq.com/account/settings
-api_key:
+api_key:
-# Boolean to enable debug_mode, which outputs massive amounts of log messages
-# to the /tmp/ directory.
-debug_mode: no
+# Boolean to enable debug_mode, which outputs massive amounts of log messages
+# to the /tmp/ directory.
+debug_mode: no
# Force the hostname to whatever you want.
#hostname: mymachine.mydomain
@@ -29,6 +29,23 @@ use_mount: no
# graphite_listen_port: 17124
# ========================================================================== #
+# dogstatsd configuration #
+# ========================================================================== #
+
+# Make sure your client is sending to the same port.
+
+# dogstatsd_port : 8125
+
+# By default dogstatsd will post aggregate metrics to the agent (which handles
+# errors/timeouts/retries/etc). To send directly to the datadog api, set this
+# to https://app.datadoghq.com.
+
+# dogstatsd_target : http://localhost:17123
+
+## The dogstatsd flush period.
+# dogstatsd_interval : 10
+
+# ========================================================================== #
# Service-specific configuration #
# ========================================================================== #
@@ -80,8 +97,8 @@ use_mount: no
# PostgreSQL port
#postgresql_port:
-# PostgreSQL user. It needs to connect to the "postgres" database but does not
-# require any privileges, so you should consider creating a separate,
+# PostgreSQL user. It needs to connect to the "postgres" database but does not
+# require any privileges, so you should consider creating a separate,
# unprivileged user
#postgresql_user:
@@ -103,8 +120,8 @@ use_mount: no
# -------------------------------------------------------------------------- #
# Url to RabbitMQ's status page. Must have rabbitmq-status plugin installed.
-# See http://www.lshift.net/blog/2009/11/30/introducing-rabbitmq-status-plugin
-# for details.
+# See http://www.lshift.net/blog/2009/11/30/introducing-rabbitmq-status-plugin
+# for details.
#rabbitmq_status_url: http://www.example.com:55672/json
#rabbitmq_user: guest
#rabbitmq_pass: guest
@@ -170,7 +187,7 @@ use_mount: no
# JMX server:port to connect to
# jvm_jmx_server: localhost:8090
-#jvm_jmx_server:
+#jvm_jmx_server:
# JMX user to log in with, if needed
#jvm_jmx_user: john
@@ -222,20 +239,20 @@ use_mount: no
#solr_jmx_pass: foobar
# -------------------------------------------------------------------------- #
-# Rails
+# Rails
# -------------------------------------------------------------------------- #
#rails_logs: /var/log/myrailsapp.log
# -------------------------------------------------------------------------- #
-# Memcache
+# Memcache
# -------------------------------------------------------------------------- #
#memcache_server: localhost
#memcache_port: 11211
# -------------------------------------------------------------------------- #
-# Dogstream (log file parser)
+# Dogstream (log file parser)
# -------------------------------------------------------------------------- #
# Comma-separated list of logs to parse and optionally custom parsers to use.
@@ -244,10 +261,10 @@ use_mount: no
# dogstreams: /path/to/log1:parsers:custom_parser, /path/to/log2, /path/to/log3, ...
#
# Each entry is a path to a log file and optionally a Python module/function pair
-# separated by colons.
+# separated by colons.
#
-# Custom parsers should take a 2 parameters, a logger object and
-# a string parameter of the current line to parse. It should return a tuple of
+# Custom parsers should take a 2 parameters, a logger object and
+# a string parameter of the current line to parse. It should return a tuple of
# the form:
# (metric (str), timestamp (unix timestamp), value (float), attributes (dict))
# where attributes should at least contain the key 'metric_type', specifying
diff --git a/dogstatsd.py b/dogstatsd.py
new file mode 100755
index 0000000..5a572d0
--- /dev/null
+++ b/dogstatsd.py
@@ -0,0 +1,334 @@
+#!/usr/bin/python
+'''
+A Python Statsd implementation with some datadog special sauce.
+'''
+
+# stdlib
+import httplib as http_client
+import logging
+import optparse
+from random import randrange
+import re
+import socket
+import sys
+import time
+import threading
+from urllib import urlencode
+
+# project
+from config import get_config
+from util import json
+
+logger = logging.getLogger('dogstatsd')
+
+class Metric(object):
+ """
+ A base metric class that accepts points, slices them into time intervals
+ and performs roll-ups within those intervals.
+ """
+
+ def sample(self, value, sample_rate):
+ """ Add a point to the given metric. """
+ raise NotImplementedError()
+
+ def flush(self, timestamp):
+ """ Flush all metrics up to the given timestamp. """
+ raise NotImplementedError()
+
+
+class Gauge(Metric):
+ """ A metric that tracks a value at particular points in time. """
+
+ def __init__(self, name, tags, hostname):
+ self.name = name
+ self.value = None
+ self.tags = tags
+ self.hostname = hostname
+
+ def sample(self, value, sample_rate):
+ self.value = value
+
+ def flush(self, timestamp):
+ return [{
+ 'metric' : self.name,
+ 'points' : [(timestamp, self.value)],
+ 'tags' : self.tags,
+ 'host' : self.hostname
+ }]
+
+
+class Counter(Metric):
+ """ A metric that tracks a counter value. """
+
+ def __init__(self, name, tags, hostname):
+ self.name = name
+ self.value = 0
+ self.tags = tags
+ self.hostname = hostname
+
+ def sample(self, value, sample_rate):
+ self.value += value * int(1 / sample_rate)
+
+ def flush(self, timestamp):
+ return [{
+ 'metric' : self.name,
+ 'points' : [(timestamp, self.value)],
+ 'tags' : self.tags,
+ 'host' : self.hostname
+ }]
+
+
+class Histogram(Metric):
+ """ A metric to track the distribution of a set of values. """
+
+ def __init__(self, name, tags, hostname):
+ self.name = name
+ self.max = float("-inf")
+ self.min = float("inf")
+ self.sum = 0
+ self.count = 0
+ self.sample_size = 1000
+ self.samples = []
+ self.percentiles = [0.75, 0.85, 0.95, 0.99]
+ self.tags = tags
+ self.hostname = hostname
+
+ def sample(self, value, sample_rate):
+ count = int(1 / sample_rate)
+ self.max = self.max if self.max > value else value
+ self.min = self.min if self.min < value else value
+ self.sum += value * count
+ self.count += count
+ if len(self.samples) < self.sample_size:
+ self.samples.append(value)
+ else:
+ self.samples[randrange(0, self.sample_size)] = value
+
+ def flush(self, ts):
+ if not self.count:
+ return []
+
+ metrics = [
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.min' % self.name, 'points' : [(ts, self.min)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.max' % self.name, 'points' : [(ts, self.max)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.avg' % self.name, 'points' : [(ts, self.average())]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.count' % self.name, 'points' : [(ts, self.count)]},
+ ]
+
+ length = len(self.samples)
+ self.samples.sort()
+ for p in self.percentiles:
+ val = self.samples[int(round(p * length - 1))]
+ name = '%s.%spercentile' % (self.name, int(p * 100))
+ metrics.append({'host': self.hostname, 'tags':self.tags, 'metric': name, 'points': [(ts, val)]})
+ return metrics
+
+ def average(self):
+ return float(self.sum) / self.count
+
+
+
+class MetricsAggregator(object):
+ """
+ A metric aggregator class.
+ """
+
+ def __init__(self, hostname):
+ self.metrics = {}
+ self.count = 0
+ self.metric_type_to_class = {
+ 'g': Gauge,
+ 'c': Counter,
+ 'h': Histogram,
+ 'ms' : Histogram
+ }
+ self.hostname = hostname
+
+ def submit(self, packet):
+ self.count += 1
+ # We can have colons in tags, so split once.
+ name_and_metadata = packet.split(':', 1)
+
+ if len(name_and_metadata) != 2:
+ raise Exception('Unparseable packet: %s' % packet)
+
+ name = name_and_metadata[0]
+ metadata = name_and_metadata[1].split('|')
+
+ if len(metadata) < 2:
+ raise Exception('Unparseable packet: %s' % packet)
+
+ # Parse the optional values - sample rate & tags.
+ sample_rate = 1
+ tags = None
+ for m in metadata[2:]:
+ # Parse the sample rate
+ if m[0] == '@':
+ sample_rate = float(m[1:])
+ assert 0 <= sample_rate <= 1
+ elif m[0] == '#':
+ tags = tuple(sorted(m[1:].split(',')))
+
+ context = (name, tags)
+
+ if context not in self.metrics:
+ metric_class = self.metric_type_to_class[metadata[1]]
+ self.metrics[context] = metric_class(name, tags, self.hostname)
+ self.metrics[context].sample(float(metadata[0]), sample_rate)
+
+
+ def flush(self, timestamp=None):
+ timestamp = timestamp or time.time()
+ metrics = []
+ for context, metric in self.metrics.items():
+ metrics += metric.flush(timestamp)
+ del self.metrics[context]
+ logger.info("received %s payloads since last flush" % self.count)
+ self.count = 0
+ return metrics
+
+
+
+class Reporter(threading.Thread):
+ """
+ The reporter periodically sends the aggregated metrics to the
+ server.
+ """
+
+ def __init__(self, interval, metrics_aggregator, api_host, api_key=None):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.interval = int(interval)
+ self.finished = threading.Event()
+ self.metrics_aggregator = metrics_aggregator
+ self.flush_count = 0
+
+ self.api_key = api_key
+ self.api_host = api_host
+
+ self.http_conn_cls = http_client.HTTPSConnection
+
+ match = re.match('^(https?)://(.*)', api_host)
+
+ if match:
+ self.api_host = match.group(2)
+ if match.group(1) == 'http':
+ self.http_conn_cls = http_client.HTTPConnection
+
+ def end(self):
+ self.finished.set()
+
+ def run(self):
+ logger.info("Reporting to %s every %ss" % (self.api_host, self.interval))
+ while True:
+ if self.finished.is_set():
+ break
+ self.finished.wait(self.interval)
+ self.flush()
+
+ def flush(self):
+ try:
+ self.flush_count += 1
+ metrics = self.metrics_aggregator.flush()
+ count = len(metrics)
+ if not count:
+ logger.info("Flush #{0}: No metrics to flush.".format(self.flush_count))
+ return
+ logger.info("Flush #{0}: flushing {1} metrics".format(self.flush_count, count))
+ self.submit(metrics)
+ except:
+ logger.exception("Error flushing metrics")
+
+ def submit(self, metrics):
+
+ # HACK - Copy and pasted from dogapi, because it's a bit of a pain to distribute python
+ # dependencies with the agent.
+ conn = self.http_conn_cls(self.api_host)
+ body = json.dumps({"series" : metrics})
+ headers = {'Content-Type':'application/json'}
+ method = 'POST'
+
+ params = {}
+ if self.api_key:
+ params['api_key'] = self.api_key
+ url = '/api/v1/series?%s' % urlencode(params)
+
+ start_time = time.time()
+ conn.request(method, url, body, headers)
+
+ #FIXME: add timeout handling code here
+
+ response = conn.getresponse()
+ duration = round((time.time() - start_time) * 1000.0, 4)
+ logger.info("%s %s %s%s (%sms)" % (
+ response.status, method, self.api_host, url, duration))
+
+class Server(object):
+ """
+ A statsd udp server.
+ """
+
+ def __init__(self, metrics_aggregator, host, port):
+ self.host = host
+ self.port = int(port)
+ self.address = (self.host, self.port)
+
+ self.metrics_aggregator = metrics_aggregator
+
+ self.buffer_size = 1024
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.socket.bind(self.address)
+
+ def start(self):
+ """ Run the server. """
+ logger.info('Starting dogstatsd server on %s' % str(self.address))
+
+ # Inline variables to speed up look-ups.
+ buffer_size = self.buffer_size
+ aggregator_submit = self.metrics_aggregator.submit
+ socket_recv = self.socket.recv
+
+ while True:
+ try:
+ aggregator_submit(socket_recv(buffer_size))
+ except (KeyboardInterrupt, SystemExit):
+ break
+ except:
+ logger.exception('Error receiving datagram')
+
+
+def main():
+
+ c = get_config(parse_args=False)
+
+ port = c['dogstatsd_port']
+ target = c['dogstatsd_target']
+ interval = c['dogstatsd_interval']
+ api_key = c['apiKey']
+ host = 'localhost'
+
+ if c.get('debugMode'):
+ logging.basicConfig(filename="/tmp/dogstatsd.log", filemode='w', level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ logging.info("Debug logging to /tmp/dogstatsd.log")
+
+ hostname = socket.gethostname()
+
+ # Create the aggregator (which is the point of communication between the
+ # server and reporting threads.
+ aggregator = MetricsAggregator(hostname)
+
+ # Start the reporting thread.
+ reporter = Reporter(interval, aggregator, target, api_key)
+ reporter.start()
+
+ # Start the server.
+ server_host = ''
+ server = Server(aggregator, server_host, port)
+ server.start()
+
+ # If we're here, we're done.
+ logger.info("Shutting down ...")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/packaging/datadog-agent-base/deb/datadog-agent.init b/packaging/datadog-agent-base/deb/datadog-agent.init
index 7dc0fa0..74c3e8e 100755
--- a/packaging/datadog-agent-base/deb/datadog-agent.init
+++ b/packaging/datadog-agent-base/deb/datadog-agent.init
@@ -54,8 +54,15 @@ case "$1" in
echo -n "Stopping $DESC: "
if [ -f $USE_SUPERVISOR ]; then
- echo -n "(using supervisorctl) "
- supervisorctl stop datadog-agent:*
+ # Prevent errors if not under actual supervision
+ supervised=$(supervisorctl avail | grep datadog-agent | wc -l)
+ if [ $supervised -gt 1 ]; then
+ echo -n "(using supervisorctl) "
+ supervisorctl stop datadog-agent:*
+ else
+ # Should have been
+ echo -n "(warning: datadog-agent supervisor config is missing) "
+ fi
else
su $AGENTUSER -c "$AGENTPATH stop init"
fi
diff --git a/packaging/datadog-agent/Makefile b/packaging/datadog-agent/Makefile
index a350baa..cc849bd 100644
--- a/packaging/datadog-agent/Makefile
+++ b/packaging/datadog-agent/Makefile
@@ -27,7 +27,9 @@ install: dirs
mkdir -p $(BUILD)/usr/bin
cp ../../ddagent.py $(BUILD)/usr/share/datadog/agent
cp ../../transaction.py $(BUILD)/usr/share/datadog/agent
+ cp ../../dogstatsd.py $(BUILD)/usr/share/datadog/agent
ln -sf ../share/datadog/agent/ddagent.py $(BUILD)/usr/bin/dd-forwarder
+ ln -sf ../share/datadog/agent/dogstatsd.py $(BUILD)/usr/bin/dogstatsd
install_deb: install
mkdir -p $(BUILD)/etc/supervisor/conf.d
diff --git a/packaging/datadog-agent/deb/supervisor.conf b/packaging/datadog-agent/deb/supervisor.conf
index 68d1fff..6ba6429 100644
--- a/packaging/datadog-agent/deb/supervisor.conf
+++ b/packaging/datadog-agent/deb/supervisor.conf
@@ -2,7 +2,7 @@
command=/usr/bin/dd-agent foreground --use-local-forwarder
stdout_logfile=NONE
stderr_logfile=NONE
-priority=999
+priority=999
startsecs=2
user=dd-agent
@@ -10,9 +10,18 @@ user=dd-agent
command=/usr/bin/dd-forwarder
redirect_stderr=true
stdout_logfile=/var/log/ddforwarder.log
-startsecs=3
+startsecs=3
priority=998
user=dd-agent
+[program:dogstatsd]
+command=/usr/bin/dogstatsd
+redirect_stderr=true
+stdout_logfile=/var/log/dogstatsd.log
+startsecs=2
+priority=999
+user=dd-agent
+
+
[group:datadog-agent]
-programs=forwarder,collector
+programs=forwarder,collector,dogstatsd
diff --git a/packaging/datadog-agent/source/setup_agent.sh b/packaging/datadog-agent/source/setup_agent.sh
index 75366a9..291ff20 100644
--- a/packaging/datadog-agent/source/setup_agent.sh
+++ b/packaging/datadog-agent/source/setup_agent.sh
@@ -1,11 +1,14 @@
#!/bin/sh
-if [ $# -ne 1 ]; then
+if [ $# -eq 1 ]; then
+ apikey=$1
+elif [ -n "$DD_API_KEY" ]; then
+ apikey=$DD_API_KEY
+else
echo "Usage: $0 <api_key>"
exit 1
fi
-apikey=$1
unamestr=`uname`
# create home base for the agent
@@ -24,7 +27,7 @@ pip install tornado
mkdir -p $dd_base/agent
curl -L -o $dd_base/agent.tar.gz https://github.com/DataDog/dd-agent/tarball/master
tar -xz -C $dd_base/agent --strip-components 1 -f $dd_base/agent.tar.gz
-sed "s/api_key:.*/api_key: $1/" $dd_base/agent/datadog.conf.example > $dd_base/agent/datadog.conf
+sed "s/api_key:.*/api_key: $apikey/" $dd_base/agent/datadog.conf.example > $dd_base/agent/datadog.conf
mkdir -p $dd_base/bin
cp $dd_base/agent/packaging/datadog-agent/source/agent $dd_base/bin/agent
chmod +x $dd_base/bin/agent
diff --git a/tests/test_datadog.py b/tests/test_datadog.py
index 6daeaed..3a65e71 100644
--- a/tests/test_datadog.py
+++ b/tests/test_datadog.py
@@ -607,5 +607,5 @@ class TestNagiosPerfData(TailTestCase):
if __name__ == '__main__':
- logging.basicConfig(format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
+ # logging.basicConfig(format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s")
unittest.main()
diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py
new file mode 100644
index 0000000..3a2e326
--- /dev/null
+++ b/tests/test_dogstatsd.py
@@ -0,0 +1,186 @@
+
+import random
+import time
+
+import nose.tools as nt
+
+from dogstatsd import MetricsAggregator
+
+
+class TestUnitDogStatsd(object):
+
+ @staticmethod
+ def sort_metrics(metrics):
+ def sort_by(m):
+ return (m['metric'], ','.join(m['tags'] or []))
+ return sorted(metrics, key=sort_by)
+
+ def test_tags(self):
+ stats = MetricsAggregator('myhost')
+ stats.submit('gauge:1|c')
+ stats.submit('gauge:2|c|@1')
+ stats.submit('gauge:4|c|#tag1,tag2')
+ stats.submit('gauge:8|c|#tag2,tag1') # Should be the same as above
+ stats.submit('gauge:16|c|#tag3,tag4')
+
+ metrics = self.sort_metrics(stats.flush())
+
+ assert len(metrics) == 3
+ first, second, third = metrics
+
+ nt.assert_equal(first['metric'], 'gauge')
+ nt.assert_equal(first['tags'], None)
+ nt.assert_equal(first['points'][0][1], 3)
+ nt.assert_equal(first['host'], 'myhost')
+
+ nt.assert_equal(second['metric'], 'gauge')
+ nt.assert_equal(second['tags'], ('tag1', 'tag2'))
+ nt.assert_equal(second['points'][0][1], 12)
+ nt.assert_equal(second['host'], 'myhost')
+
+ nt.assert_equal(third['metric'], 'gauge')
+ nt.assert_equal(third['tags'], ('tag3', 'tag4'))
+ nt.assert_equal(third['points'][0][1], 16)
+ nt.assert_equal(third['host'], 'myhost')
+
+
+ def test_counter(self):
+ stats = MetricsAggregator('myhost')
+
+ # Track some counters.
+ stats.submit('my.first.counter:1|c')
+ stats.submit('my.first.counter:5|c')
+ stats.submit('my.second.counter:1|c')
+ stats.submit('my.third.counter:3|c')
+
+ # Ensure they roll up nicely.
+ metrics = self.sort_metrics(stats.flush())
+ assert len(metrics) == 3
+
+ first, second, third = metrics
+ nt.assert_equals(first['metric'], 'my.first.counter')
+ nt.assert_equals(first['points'][0][1], 6)
+ nt.assert_equals(first['host'], 'myhost')
+
+ nt.assert_equals(second['metric'], 'my.second.counter')
+ nt.assert_equals(second['points'][0][1], 1)
+
+ nt.assert_equals(third['metric'], 'my.third.counter')
+ nt.assert_equals(third['points'][0][1], 3)
+
+ # Ensure they're gone now.
+ assert not len(stats.flush())
+
+ def test_sampled_counter(self):
+
+ # Submit a sampled counter.
+ stats = MetricsAggregator('myhost')
+ stats.submit('sampled.counter:1|c|@0.5')
+ metrics = stats.flush()
+ assert len(metrics) == 1
+ m = metrics[0]
+ assert m['metric'] == 'sampled.counter'
+ nt.assert_equal(m['points'][0][1], 2)
+
+ def test_gauge(self):
+ stats = MetricsAggregator('myhost')
+
+ # Track some counters.
+ stats.submit('my.first.gauge:1|g')
+ stats.submit('my.first.gauge:5|g')
+ stats.submit('my.second.gauge:1.5|g')
+
+ # Ensure they roll up nicely.
+ metrics = self.sort_metrics(stats.flush())
+ assert len(metrics) == 2
+
+ first, second = metrics
+
+ nt.assert_equals(first['metric'], 'my.first.gauge')
+ nt.assert_equals(first['points'][0][1], 5)
+ nt.assert_equals(first['host'], 'myhost')
+
+ nt.assert_equals(second['metric'], 'my.second.gauge')
+ nt.assert_equals(second['points'][0][1], 1.5)
+
+
+ # Ensure they shall be flushed no more.
+ metrics = stats.flush()
+ assert not len(metrics)
+
+ def test_gauge_sample_rate(self):
+ stats = MetricsAggregator('myhost')
+
+ # Submit a sampled gauge metric.
+ stats.submit('sampled.gauge:10|g|@0.1')
+
+ # Assert that it's treated normally.
+ metrics = stats.flush()
+ nt.assert_equal(len(metrics), 1)
+ m = metrics[0]
+ nt.assert_equal(m['metric'], 'sampled.gauge')
+ nt.assert_equal(m['points'][0][1], 10)
+
+ def test_histogram(self):
+ stats = MetricsAggregator('myhost')
+
+ # Sample all numbers between 1-100 many times. This
+ # means our percentiles should be relatively close to themselves.
+ percentiles = range(100)
+ random.shuffle(percentiles) # in place
+ for i in percentiles:
+ for j in xrange(20):
+ for type_ in ['h', 'ms']:
+ m = 'my.p:%s|%s' % (i, type_)
+ stats.submit(m)
+
+ metrics = self.sort_metrics(stats.flush())
+
+ def assert_almost_equal(i, j, e=1):
+ # Floating point math?
+ assert abs(i - j) <= e, "%s %s %s" % (i, j, e)
+ nt.assert_equal(len(metrics), 8)
+ p75, p85, p95, p99, pavg, pcount, pmax, pmin = self.sort_metrics(metrics)
+ nt.assert_equal(p75['metric'], 'my.p.75percentile')
+ assert_almost_equal(p75['points'][0][1], 75, 10)
+ assert_almost_equal(p85['points'][0][1], 85, 10)
+ assert_almost_equal(p95['points'][0][1], 95, 10)
+ assert_almost_equal(p99['points'][0][1], 99, 10)
+ nt.assert_equals(p75['host'], 'myhost')
+
+ def test_sampled_histogram(self):
+ # Submit a sampled histogram.
+ stats = MetricsAggregator('myhost')
+ stats.submit('sampled.hist:5|h|@0.5')
+
+ # Assert we scale up properly.
+ metrics = self.sort_metrics(stats.flush())
+ p75, p85, p95, p99, pavg, pcount, pmin, pmax = self.sort_metrics(metrics)
+
+ nt.assert_equal(pcount['points'][0][1], 2)
+ for p in [p75, p85, p99, pavg, pmin, pmax]:
+ nt.assert_equal(p['points'][0][1], 5)
+
+
+ def test_bad_packets_throw_errors(self):
+ packets = [
+ 'missing.value.and.type',
+ 'missing.type:2',
+ 'missing.value|c',
+ '2|c',
+ 'unknown.type:2|z',
+ 'string.value:abc|c',
+ 'string.sample.rate:0|c|@abc',
+ ]
+
+ stats = MetricsAggregator('myhost')
+ for packet in packets:
+ try:
+ stats.submit(packet)
+ except:
+ assert True
+ else:
+ assert False, 'invalid : %s' % packet
+
+
+
diff --git a/tests/test_ec2.py b/tests/test_ec2.py
index b1d74ea..9068fba 100644
--- a/tests/test_ec2.py
+++ b/tests/test_ec2.py
@@ -10,17 +10,23 @@ class TestEC2(unittest.TestCase):
self._ec2 = EC2(logging.getLogger("tests"))
def test_metadata(self):
+ # Test gathering metadata from ec2
start = time.time()
d = self._ec2.get_metadata()
end = time.time()
assert type(d) == types.DictType
- # Either we're on ec2 or we're not (7 attributes expected)
- assert len(d) == 0 or len(d) >= 7
- if len(d) > 0:
- assert "hostname" in d
- assert "instance-id" in d
- # either way, it should have not taken more than 1s to get an answer
- assert end - start <= 1.0, "It took %s seconds to get ec2 metadata" % (end-start)
+ # fqdn must be set on ec2 and elsewhere
+ assert "hostname" in d, d
+ assert "fqdn" in d
+ if d["fqdn"] != "localhost": # travis-ci
+ assert d["fqdn"].startswith(d["hostname"]), d
+ # Either we're on ec2 or we're not (at least 7 attributes expected)
+ assert len(d) == 2 or len(d) >= 7, d
+ if len(d) > 2:
+ assert "instance-id" in d, d
+ assert d["instance-id"].startswith("i-"), d
+ assert d["hostname"].startswith("i-") or d["hostname"].startswith("domU-"), d
+ assert end - start <= 1.1, "It took %s seconds to get ec2 metadata" % (end-start)
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_ganglia.py b/tests/test_ganglia.py
index e41ab6a..c96d295 100644
--- a/tests/test_ganglia.py
+++ b/tests/test_ganglia.py
@@ -1,5 +1,4 @@
import logging
-logging.basicConfig()
import unittest
import subprocess
try:
@@ -24,7 +23,7 @@ class TestGanglia(unittest.TestCase):
time.sleep(1)
pfile = tempfile.NamedTemporaryFile()
- g = Ganglia(logging.getLogger('tests'))
+ g = Ganglia(logging.getLogger(__file__))
# Running the profiler
# profile.runctx("g.check({'ganglia_host': 'localhost', 'ganglia_port': 8651})", {}, {"g": g}, pfile.name)
# p = pstats.Stats(pfile.name)
diff --git a/tests/test_mcache.py b/tests/test_mcache.py
index a6ba00c..b33e274 100644
--- a/tests/test_mcache.py
+++ b/tests/test_mcache.py
@@ -1,13 +1,12 @@
import unittest
import logging
-logging.basicConfig(level=logging.DEBUG)
from subprocess import Popen, PIPE
import multiprocessing
from checks.db.mcache import *
class TestMemCache(unittest.TestCase):
def setUp(self):
- self.c = Memcache(logging.getLogger())
+ self.c = Memcache(logging.getLogger(__file__))
def _countConnections(self, port):
pid = multiprocessing.current_process().pid
diff --git a/tests/test_mongo.py b/tests/test_mongo.py
index d9c21b2..6cea1c9 100644
--- a/tests/test_mongo.py
+++ b/tests/test_mongo.py
@@ -1,6 +1,5 @@
import unittest
import logging
-logging.basicConfig()
import subprocess
from tempfile import mkdtemp
import time
@@ -29,7 +28,7 @@ class TestMongo(unittest.TestCase):
break
def setUp(self):
- self.c = MongoDb(logging.getLogger())
+ self.c = MongoDb(logging.getLogger(__file__))
# Start 2 instances of Mongo in a replica set
dir1 = mkdtemp()
dir2 = mkdtemp()
diff --git a/tests/test_system.py b/tests/test_system.py
index ccb8639..87018ab 100644
--- a/tests/test_system.py
+++ b/tests/test_system.py
@@ -2,8 +2,7 @@ import unittest
import logging
import sys
-logging.basicConfig()
-logger = logging.getLogger()
+logger = logging.getLogger(__file__)
from checks.system import *
@@ -150,6 +149,16 @@ sda 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
self.assertTrue(key in results['sda'], 'key %r not in results["sda"]' % key)
self.assertEqual(results['sda'][key], '0.00')
+ def testNetwork(self):
+ global logger
+ checker = Network(logger)
+ # First call yields nothing
+ self.assertEquals(False, checker.check({}))
+ # Second call yields values
+ if sys.platform == "darwin":
+ v = checker.check({})
+ assert "lo0" in v
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_transaction.py b/tests/test_transaction.py
index 00b020d..537e718 100644
--- a/tests/test_transaction.py
+++ b/tests/test_transaction.py
@@ -5,12 +5,7 @@ import time
from transaction import Transaction, TransactionManager
from ddagent import MAX_WAIT_FOR_REPLAY, MAX_QUEUE_SIZE, THROTTLING_DELAY
-#import logging
-#logging.basicConfig(level=logging.INFO)
-
class memTransaction(Transaction):
-
-
def __init__(self, size, manager):
Transaction.__init__(self)
self._trManager = manager
diff --git a/tests/test_web.py b/tests/test_web.py
index f539835..9db5cdd 100644
--- a/tests/test_web.py
+++ b/tests/test_web.py
@@ -1,7 +1,6 @@
import unittest
import logging
-logging.basicConfig()
-logger = logging.getLogger()
+logger = logging.getLogger(__file__)
from checks.web import *
@alq666
Copy link
Author

alq666 commented Jun 11, 2012

Highlights

  • Support for OS X network metrics
  • dogstatsd present but turned off
  • Richer EC2 metadata reported to Datadog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment