Skip to content

Instantly share code, notes, and snippets.

@alq666
Created June 18, 2012 18:59
Show Gist options
  • Save alq666/2950050 to your computer and use it in GitHub Desktop.
Save alq666/2950050 to your computer and use it in GitHub Desktop.
Changes in 2.2.26
Modified checks/__init__.py
diff --git a/checks/__init__.py b/checks/__init__.py
index 492733c..46f5ed3 100644
--- a/checks/__init__.py
+++ b/checks/__init__.py
@@ -53,12 +53,17 @@ class LaconicFilter(logging.Filter):
class Check(object):
"""
(Abstract) class for all checks with the ability to:
+ * store 1 (and only 1) sample for gauges per metric/tag combination
* compute rates for counters
* only log error messages once (instead of each time they occur)
+
"""
def __init__(self, logger):
# where to store samples, indexed by metric_name
- # metric_name: [(ts, value), (ts, value)]
+ # metric_name: {("sorted", "tags"): [(ts, value), (ts, value)],
+ # tuple(tags) are stored as a key since lists are not hashable
+ # None: [(ts, value), (ts, value)]}
+ # untagged values are indexed by None
self._sample_store = {}
self._counters = {} # metric_name: bool
self.logger = logger
@@ -92,7 +97,7 @@ class Check(object):
ACHTUNG: Resets previous values associated with this metric.
"""
self._counters[metric] = True
- self._sample_store[metric] = []
+ self._sample_store[metric] = {}
def is_counter(self, metric):
"Is this metric a counter?"
@@ -103,7 +108,7 @@ class Check(object):
Treats the metric as a gauge, i.e. keep the data as is
ACHTUNG: Resets previous values associated with this metric.
"""
- self._sample_store[metric] = []
+ self._sample_store[metric] = {}
def is_metric(self, metric):
return metric in self._sample_store
@@ -116,8 +121,9 @@ class Check(object):
"Get all metric names"
return self._sample_store.keys()
- def save_sample(self, metric, value, timestamp=None):
- """Save a simple sample, evict old values if needed"""
+ def save_sample(self, metric, value, timestamp=None, tags=None):
+ """Save a simple sample, evict old values if needed
+ """
if timestamp is None:
timestamp = time.time()
if metric not in self._sample_store:
@@ -127,21 +133,27 @@ class Check(object):
except ValueError, ve:
raise NaN(ve)
+ # sort tags
+ if tags is not None:
+ tags.sort()
+ tags = tuple(tags)
+
# Data eviction rules
if self.is_gauge(metric):
- self._sample_store[metric] = [(timestamp, value)]
+ self._sample_store[metric][tags] = ((timestamp, value), )
elif self.is_counter(metric):
- if len(self._sample_store[metric]) == 0:
- self._sample_store[metric] = [(timestamp, value)]
+ if self._sample_store[metric].get(tags) is None:
+ self._sample_store[metric][tags] = [(timestamp, value)]
else:
- self._sample_store[metric] = self._sample_store[metric][-1:] + [(timestamp, value)]
+ self._sample_store[metric][tags] = self._sample_store[metric][tags][-1:] + [(timestamp, value)]
else:
raise CheckException("%s must be either gauge or counter, skipping sample at %s" % (metric, time.ctime(timestamp)))
if self.is_gauge(metric):
- assert len(self._sample_store[metric]) in (0, 1), self._sample_store[metric]
+ # store[metric][tags] = (ts, val) - only 1 value allowd
+ assert len(self._sample_store[metric][tags]) == 1, self._sample_store[metric]
elif self.is_counter(metric):
- assert len(self._sample_store[metric]) in (0, 1, 2), self._sample_store[metric]
+ assert len(self._sample_store[metric][tags]) in (1, 2), self._sample_store[metric]
@classmethod
def _rate(cls, sample1, sample2):
@@ -163,28 +175,28 @@ class Check(object):
except Exception, e:
raise NaN(e)
- def get_sample_with_timestamp(self, metric):
+ def get_sample_with_timestamp(self, metric, tags=None):
"Get (timestamp-epoch-style, value)"
# Never seen this metric
if metric not in self._sample_store:
raise UnknownValue()
# Not enough value to compute rate
- elif self.is_counter(metric) and len(self._sample_store[metric]) < 2:
+ elif self.is_counter(metric) and len(self._sample_store[metric][tags]) < 2:
raise UnknownValue()
- elif self.is_counter(metric) and len(self._sample_store[metric]) >= 2:
- return self._rate(self._sample_store[metric][-2], self._sample_store[metric][-1])
+ elif self.is_counter(metric) and len(self._sample_store[metric][tags]) >= 2:
+ return self._rate(self._sample_store[metric][tags][-2], self._sample_store[metric][tags][-1])
- elif self.is_gauge(metric) and len(self._sample_store[metric]) >= 1:
- return self._sample_store[metric][-1]
+ elif self.is_gauge(metric) and len(self._sample_store[metric][tags]) >= 1:
+ return self._sample_store[metric][tags][-1]
else:
raise UnknownValue()
- def get_sample(self, metric):
+ def get_sample(self, metric, tags=None):
"Return the last value for that metric"
- x = self.get_sample_with_timestamp(metric)
+ x = self.get_sample_with_timestamp(metric, tags)
assert type(x) == types.TupleType and len(x) == 2, x
return x[1]
@@ -209,21 +221,15 @@ class Check(object):
pass
return values
- def get_metadata(self):
- """Return a dictionary of key-value pairs with metadata
- How these metadata are interpreted and processed is not defined here
- """
- return {}
-
def get_metrics(self):
"""This is the new format to send metrics backs
"""
metrics = []
for m in self._sample_store:
try:
- ts, val = self.get_sample_with_timestamp(m)
- # FIXME alq - no metadata yet
- metrics.append((m, int(ts), val, {}))
+ for t in self._sample_store[m]:
+ ts, val = self.get_sample_with_timestamp(m, t)
+ metrics.append((m, int(ts), val, {"tags": list(t)}))
except:
pass
return metrics
Modified dogstatsd.py
diff --git a/dogstatsd.py b/dogstatsd.py
index 5a572d0..6ee7c62 100755
--- a/dogstatsd.py
+++ b/dogstatsd.py
@@ -94,39 +94,34 @@ class Histogram(Metric):
self.hostname = hostname
def sample(self, value, sample_rate):
- count = int(1 / sample_rate)
- self.max = self.max if self.max > value else value
- self.min = self.min if self.min < value else value
- self.sum += value * count
- self.count += count
- if len(self.samples) < self.sample_size:
- self.samples.append(value)
- else:
- self.samples[randrange(0, self.sample_size)] = value
+ self.count += int(1 / sample_rate)
+ self.samples.append(value)
def flush(self, ts):
if not self.count:
return []
+ self.samples.sort()
+ length = len(self.samples)
+
+ min_ = self.samples[0]
+ max_ = self.samples[-1]
+ avg = self.samples[int(round(length/2 - 1))]
+
+
metrics = [
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.min' % self.name, 'points' : [(ts, self.min)]},
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.max' % self.name, 'points' : [(ts, self.max)]},
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.avg' % self.name, 'points' : [(ts, self.average())]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.min' % self.name, 'points' : [(ts, min_)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.max' % self.name, 'points' : [(ts, max_)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.avg' % self.name, 'points' : [(ts, avg)]},
{'host':self.hostname, 'tags': self.tags, 'metric' : '%s.count' % self.name, 'points' : [(ts, self.count)]},
]
- length = len(self.samples)
- self.samples.sort()
for p in self.percentiles:
val = self.samples[int(round(p * length - 1))]
name = '%s.%spercentile' % (self.name, int(p * 100))
metrics.append({'host': self.hostname, 'tags':self.tags, 'metric': name, 'points': [(ts, val)]})
return metrics
- def average(self):
- return float(self.sum) / self.count
-
-
class MetricsAggregator(object):
"""
New tests/performance/dogstatsd_performance.py
diff --git a/tests/performance/dogstatsd_performance.py b/tests/performance/dogstatsd_performance.py
new file mode 100644
index 0000000..c34d91b
--- /dev/null
+++ b/tests/performance/dogstatsd_performance.py
@@ -0,0 +1,24 @@
+"""
+Performance tests to help profile dogstatsd. It does away with threads for easy
+profiling.
+"""
+
+from dogstatsd import MetricsAggregator
+from multiprocessing import Process
+
+
+flush_count = 10
+loops_per_flush = 10000
+metric_count = 5
+
+
+aggregator = MetricsAggregator('my.host')
+
+for _ in xrange(flush_count):
+ for i in xrange(loops_per_flush):
+ # Counters
+ for j in xrange(metric_count):
+ aggregator.submit('counter.%s:%s|c' % (j, i))
+ aggregator.submit('gauge.%s:%s|g' % (j, i))
+ aggregator.submit('histogram.%s:%s|h' % (j, i))
+ aggregator.flush()
Modified tests/test_common.py
diff --git a/tests/test_common.py b/tests/test_common.py
index 0b0c3b4..b20dee1 100644
--- a/tests/test_common.py
+++ b/tests/test_common.py
@@ -44,6 +44,25 @@ class TestCore(unittest.TestCase):
self.c.save_sample("test-counter", -2.0, 3.0)
self.assertRaises(UnknownValue, self.c.get_sample_with_timestamp, "test-counter")
+ def test_tags(self):
+ # Test metric tagging
+ now = int(time.time())
+ # Tag metrics
+ self.c.save_sample("test-counter", 1.0, 1.0, tags = ["tag1", "tag2"])
+ self.c.save_sample("test-counter", 2.0, 2.0, tags = ["tag1", "tag2"])
+ # Only 1 point recording for this combination of tags, won't be sent
+ self.c.save_sample("test-counter", 3.0, 3.0, tags = ["tag1", "tag3"])
+ self.c.save_sample("test-metric", 3.0, now, tags = ["tag3", "tag4"])
+ # This is a different combination of tags
+ self.c.save_sample("test-metric", 3.0, now, tags = ["tag5", "tag3"])
+ results = self.c.get_metrics()
+ results.sort()
+ self.assertEquals(results,
+ [("test-counter", 2.0, 1.0, {"tags": ["tag1", "tag2"]}),
+ ("test-metric", now, 3.0, {"tags": ["tag3", "tag4"]}),
+ ("test-metric", now, 3.0, {"tags": ["tag3", "tag5"]}),
+ ])
+
def test_samples(self):
self.assertEquals(self.c.get_samples(), {})
self.c.save_sample("test-metric", 1.0, 0.0) # value, ts
Modified tests/test_dogstatsd.py
diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py
index 3a2e326..19e541e 100644
--- a/tests/test_dogstatsd.py
+++ b/tests/test_dogstatsd.py
@@ -146,6 +146,10 @@ class TestUnitDogStatsd(object):
assert_almost_equal(p85['points'][0][1], 85, 10)
assert_almost_equal(p95['points'][0][1], 95, 10)
assert_almost_equal(p99['points'][0][1], 99, 10)
+ assert_almost_equal(pavg['points'][0][1], 50, 2)
+ assert_almost_equal(pmax['points'][0][1], 99, 1)
+ assert_almost_equal(pmin['points'][0][1], 0, 1)
+ assert_almost_equal(pcount['points'][0][1], 4000, 0) # 100 * 20 * 2
nt.assert_equals(p75['host'], 'myhost')
def test_sampled_histogram(self):
Changes in 2.2.26
Modified checks/__init__.py
diff --git a/checks/__init__.py b/checks/__init__.py
index 492733c..46f5ed3 100644
--- a/checks/__init__.py
+++ b/checks/__init__.py
@@ -53,12 +53,17 @@ class LaconicFilter(logging.Filter):
class Check(object):
"""
(Abstract) class for all checks with the ability to:
+ * store 1 (and only 1) sample for gauges per metric/tag combination
* compute rates for counters
* only log error messages once (instead of each time they occur)
+
"""
def __init__(self, logger):
# where to store samples, indexed by metric_name
- # metric_name: [(ts, value), (ts, value)]
+ # metric_name: {("sorted", "tags"): [(ts, value), (ts, value)],
+ # tuple(tags) are stored as a key since lists are not hashable
+ # None: [(ts, value), (ts, value)]}
+ # untagged values are indexed by None
self._sample_store = {}
self._counters = {} # metric_name: bool
self.logger = logger
@@ -92,7 +97,7 @@ class Check(object):
ACHTUNG: Resets previous values associated with this metric.
"""
self._counters[metric] = True
- self._sample_store[metric] = []
+ self._sample_store[metric] = {}
def is_counter(self, metric):
"Is this metric a counter?"
@@ -103,7 +108,7 @@ class Check(object):
Treats the metric as a gauge, i.e. keep the data as is
ACHTUNG: Resets previous values associated with this metric.
"""
- self._sample_store[metric] = []
+ self._sample_store[metric] = {}
def is_metric(self, metric):
return metric in self._sample_store
@@ -116,8 +121,9 @@ class Check(object):
"Get all metric names"
return self._sample_store.keys()
- def save_sample(self, metric, value, timestamp=None):
- """Save a simple sample, evict old values if needed"""
+ def save_sample(self, metric, value, timestamp=None, tags=None):
+ """Save a simple sample, evict old values if needed
+ """
if timestamp is None:
timestamp = time.time()
if metric not in self._sample_store:
@@ -127,21 +133,27 @@ class Check(object):
except ValueError, ve:
raise NaN(ve)
+ # sort tags
+ if tags is not None:
+ tags.sort()
+ tags = tuple(tags)
+
# Data eviction rules
if self.is_gauge(metric):
- self._sample_store[metric] = [(timestamp, value)]
+ self._sample_store[metric][tags] = ((timestamp, value), )
elif self.is_counter(metric):
- if len(self._sample_store[metric]) == 0:
- self._sample_store[metric] = [(timestamp, value)]
+ if self._sample_store[metric].get(tags) is None:
+ self._sample_store[metric][tags] = [(timestamp, value)]
else:
- self._sample_store[metric] = self._sample_store[metric][-1:] + [(timestamp, value)]
+ self._sample_store[metric][tags] = self._sample_store[metric][tags][-1:] + [(timestamp, value)]
else:
raise CheckException("%s must be either gauge or counter, skipping sample at %s" % (metric, time.ctime(timestamp)))
if self.is_gauge(metric):
- assert len(self._sample_store[metric]) in (0, 1), self._sample_store[metric]
+ # store[metric][tags] = (ts, val) - only 1 value allowd
+ assert len(self._sample_store[metric][tags]) == 1, self._sample_store[metric]
elif self.is_counter(metric):
- assert len(self._sample_store[metric]) in (0, 1, 2), self._sample_store[metric]
+ assert len(self._sample_store[metric][tags]) in (1, 2), self._sample_store[metric]
@classmethod
def _rate(cls, sample1, sample2):
@@ -163,28 +175,28 @@ class Check(object):
except Exception, e:
raise NaN(e)
- def get_sample_with_timestamp(self, metric):
+ def get_sample_with_timestamp(self, metric, tags=None):
"Get (timestamp-epoch-style, value)"
# Never seen this metric
if metric not in self._sample_store:
raise UnknownValue()
# Not enough value to compute rate
- elif self.is_counter(metric) and len(self._sample_store[metric]) < 2:
+ elif self.is_counter(metric) and len(self._sample_store[metric][tags]) < 2:
raise UnknownValue()
- elif self.is_counter(metric) and len(self._sample_store[metric]) >= 2:
- return self._rate(self._sample_store[metric][-2], self._sample_store[metric][-1])
+ elif self.is_counter(metric) and len(self._sample_store[metric][tags]) >= 2:
+ return self._rate(self._sample_store[metric][tags][-2], self._sample_store[metric][tags][-1])
- elif self.is_gauge(metric) and len(self._sample_store[metric]) >= 1:
- return self._sample_store[metric][-1]
+ elif self.is_gauge(metric) and len(self._sample_store[metric][tags]) >= 1:
+ return self._sample_store[metric][tags][-1]
else:
raise UnknownValue()
- def get_sample(self, metric):
+ def get_sample(self, metric, tags=None):
"Return the last value for that metric"
- x = self.get_sample_with_timestamp(metric)
+ x = self.get_sample_with_timestamp(metric, tags)
assert type(x) == types.TupleType and len(x) == 2, x
return x[1]
@@ -209,21 +221,15 @@ class Check(object):
pass
return values
- def get_metadata(self):
- """Return a dictionary of key-value pairs with metadata
- How these metadata are interpreted and processed is not defined here
- """
- return {}
-
def get_metrics(self):
"""This is the new format to send metrics backs
"""
metrics = []
for m in self._sample_store:
try:
- ts, val = self.get_sample_with_timestamp(m)
- # FIXME alq - no metadata yet
- metrics.append((m, int(ts), val, {}))
+ for t in self._sample_store[m]:
+ ts, val = self.get_sample_with_timestamp(m, t)
+ metrics.append((m, int(ts), val, {"tags": list(t)}))
except:
pass
return metrics
Modified dogstatsd.py
diff --git a/dogstatsd.py b/dogstatsd.py
index 5a572d0..6ee7c62 100755
--- a/dogstatsd.py
+++ b/dogstatsd.py
@@ -94,39 +94,34 @@ class Histogram(Metric):
self.hostname = hostname
def sample(self, value, sample_rate):
- count = int(1 / sample_rate)
- self.max = self.max if self.max > value else value
- self.min = self.min if self.min < value else value
- self.sum += value * count
- self.count += count
- if len(self.samples) < self.sample_size:
- self.samples.append(value)
- else:
- self.samples[randrange(0, self.sample_size)] = value
+ self.count += int(1 / sample_rate)
+ self.samples.append(value)
def flush(self, ts):
if not self.count:
return []
+ self.samples.sort()
+ length = len(self.samples)
+
+ min_ = self.samples[0]
+ max_ = self.samples[-1]
+ avg = self.samples[int(round(length/2 - 1))]
+
+
metrics = [
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.min' % self.name, 'points' : [(ts, self.min)]},
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.max' % self.name, 'points' : [(ts, self.max)]},
- {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.avg' % self.name, 'points' : [(ts, self.average())]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.min' % self.name, 'points' : [(ts, min_)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.max' % self.name, 'points' : [(ts, max_)]},
+ {'host':self.hostname, 'tags': self.tags, 'metric' : '%s.avg' % self.name, 'points' : [(ts, avg)]},
{'host':self.hostname, 'tags': self.tags, 'metric' : '%s.count' % self.name, 'points' : [(ts, self.count)]},
]
- length = len(self.samples)
- self.samples.sort()
for p in self.percentiles:
val = self.samples[int(round(p * length - 1))]
name = '%s.%spercentile' % (self.name, int(p * 100))
metrics.append({'host': self.hostname, 'tags':self.tags, 'metric': name, 'points': [(ts, val)]})
return metrics
- def average(self):
- return float(self.sum) / self.count
-
-
class MetricsAggregator(object):
"""
New tests/performance/dogstatsd_performance.py
diff --git a/tests/performance/dogstatsd_performance.py b/tests/performance/dogstatsd_performance.py
new file mode 100644
index 0000000..c34d91b
--- /dev/null
+++ b/tests/performance/dogstatsd_performance.py
@@ -0,0 +1,24 @@
+"""
+Performance tests to help profile dogstatsd. It does away with threads for easy
+profiling.
+"""
+
+from dogstatsd import MetricsAggregator
+from multiprocessing import Process
+
+
+flush_count = 10
+loops_per_flush = 10000
+metric_count = 5
+
+
+aggregator = MetricsAggregator('my.host')
+
+for _ in xrange(flush_count):
+ for i in xrange(loops_per_flush):
+ # Counters
+ for j in xrange(metric_count):
+ aggregator.submit('counter.%s:%s|c' % (j, i))
+ aggregator.submit('gauge.%s:%s|g' % (j, i))
+ aggregator.submit('histogram.%s:%s|h' % (j, i))
+ aggregator.flush()
Modified tests/test_common.py
diff --git a/tests/test_common.py b/tests/test_common.py
index 0b0c3b4..b20dee1 100644
--- a/tests/test_common.py
+++ b/tests/test_common.py
@@ -44,6 +44,25 @@ class TestCore(unittest.TestCase):
self.c.save_sample("test-counter", -2.0, 3.0)
self.assertRaises(UnknownValue, self.c.get_sample_with_timestamp, "test-counter")
+ def test_tags(self):
+ # Test metric tagging
+ now = int(time.time())
+ # Tag metrics
+ self.c.save_sample("test-counter", 1.0, 1.0, tags = ["tag1", "tag2"])
+ self.c.save_sample("test-counter", 2.0, 2.0, tags = ["tag1", "tag2"])
+ # Only 1 point recording for this combination of tags, won't be sent
+ self.c.save_sample("test-counter", 3.0, 3.0, tags = ["tag1", "tag3"])
+ self.c.save_sample("test-metric", 3.0, now, tags = ["tag3", "tag4"])
+ # This is a different combination of tags
+ self.c.save_sample("test-metric", 3.0, now, tags = ["tag5", "tag3"])
+ results = self.c.get_metrics()
+ results.sort()
+ self.assertEquals(results,
+ [("test-counter", 2.0, 1.0, {"tags": ["tag1", "tag2"]}),
+ ("test-metric", now, 3.0, {"tags": ["tag3", "tag4"]}),
+ ("test-metric", now, 3.0, {"tags": ["tag3", "tag5"]}),
+ ])
+
def test_samples(self):
self.assertEquals(self.c.get_samples(), {})
self.c.save_sample("test-metric", 1.0, 0.0) # value, ts
Modified tests/test_dogstatsd.py
diff --git a/tests/test_dogstatsd.py b/tests/test_dogstatsd.py
index 3a2e326..19e541e 100644
--- a/tests/test_dogstatsd.py
+++ b/tests/test_dogstatsd.py
@@ -146,6 +146,10 @@ class TestUnitDogStatsd(object):
assert_almost_equal(p85['points'][0][1], 85, 10)
assert_almost_equal(p95['points'][0][1], 95, 10)
assert_almost_equal(p99['points'][0][1], 99, 10)
+ assert_almost_equal(pavg['points'][0][1], 50, 2)
+ assert_almost_equal(pmax['points'][0][1], 99, 1)
+ assert_almost_equal(pmin['points'][0][1], 0, 1)
+ assert_almost_equal(pcount['points'][0][1], 4000, 0) # 100 * 20 * 2
nt.assert_equals(p75['host'], 'myhost')
def test_sampled_histogram(self):
@alq666
Copy link
Author

alq666 commented Jun 18, 2012

cd /usr/share/datadog/agent
curl -L https://gist.github.com/raw/2950050/242de52427c6ab1c22c10393de3dee3c2f637f5b/tagging-checks.diff | sudo patch -p1
sudo service datadog-agent restart

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment