Last active
March 3, 2022 13:10
-
-
Save jbaiter/003cc925904a9dce7b2d05973bb28c99 to your computer and use it in GitHub Desktop.
Rewriting proxy to reduce cardinality of Prometheus node-exporter counter metrics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.9 | |
""" Rewriting proxy to reduce cardinality of Prometheus node-exporter counter metrics. | |
This small proxy service is intended to run alongside the Prometheus | |
node-exporter, with Prometheus fetching the metrics from the proxy instead of | |
from the exporter directly. The proxy will then apply user-defined rewriting | |
rules to the metrics' labels, currently either by wholesale removal of certain | |
labels or by rewriting certain label values. If, after rewriting, there are | |
several time series with identical names and labels, they are summed to form a | |
single counter metric. | |
As an example, consider the notoriously high-cardinality (and by default | |
disabled) `mountstats` collector, which includes a `export` and `protocol` label | |
for each mounted NFS share. If we were only interested in aggregated metrics by | |
the address of the remote NFS server and mount all of our shares with the same | |
protocol, we could simply remove those two labels to significantly cut down on | |
the cardinality of the exported metrics, before they even reach Prometheus. | |
Here's a sample rewriting config that does just this: | |
```json | |
{ | |
# Remove `export` and `protocol` labels from all NFS mountstats metrics, | |
# they're too fine-grained for us | |
"metric_regex": "node_mountstats_nfs_.+", | |
"action": "remove", | |
"label_regex": "(export|protocol)" | |
} | |
``` | |
Similarly, we could decide that we do want to keep certain labels, but are only | |
interested in a subset of its possible values and only interested in an | |
aggregated value for the rest of the values. This could again be the case for | |
the aforementioned `mountstats` exporter, which has a whole bunch of metrics | |
that expose an `operation` label alongside the usual labels. Let's say we're | |
only interested in detailed metrics for the `READ`, `WRITE`, `ACCESS` and | |
`CREATE` operations and want to group the rest of the operations under an | |
aggregated `OTHER` operation, here's what a config for this could look like | |
(using a negative lookahead (`(?!...)`) in the regular expression): | |
```json | |
{ | |
# Reduce granularity of NFS Operations metrics to track only the most | |
# important operations separately | |
"metric_regex": "node_mountstats_nfs_operations_.+", | |
"action": "replace", | |
"label_regex": "operation", | |
"value_regex": "^(?!(READ|WRITE|ACCESS|CREATE)$).+", | |
"replacement": "OTHER", | |
} | |
``` | |
The two above examples are included as the default configuration, which will be | |
used in absence of a user-defined configuration (passed as the path to a JSON | |
file via the `--config` command line argument). | |
""" | |
# MIT License | |
# | |
# Copyright (c) 2022 Johannes Baiter <[email protected]> | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
from __future__ import annotations | |
from dataclasses import dataclass | |
import argparse | |
import hashlib | |
import itertools | |
import json | |
import re | |
import urllib.request | |
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler | |
from typing import Iterator, List, Literal, Mapping, Optional | |
from urllib.parse import urlparse | |
VERSION = "0.1" | |
METRIC_PAT = re.compile( | |
r"^(?P<name>[a-zA-Z_:][a-zA-Z0-9_]*)(?:{(?P<labels>.*?)})? (?P<value>.+)$" | |
) | |
DEFAULT_METRICS_CONFIG = [ | |
{ | |
# Remove `export` and `protocol` labels from all NFS mountstats metrics, they're too fine-grained for us | |
"metric_regex": "node_mountstats_nfs_.+", | |
"action": "remove", | |
"label_regex": "(export|protocol)", | |
}, | |
{ | |
# Reduce granularity of NFS Operations metrics to track only the most important operations separately | |
"metric_regex": "node_mountstats_nfs_operations_.+", | |
"action": "replace", | |
"label_regex": "operation", | |
"value_regex": "^(?!(READ|WRITE|ACCESS|CREATE)$).+", | |
"replacement": "OTHER", | |
}, | |
] | |
@dataclass | |
class RewriteConfig: | |
"""Configuration for a single rewrite rule.""" | |
metric_regex: re.Pattern | |
action: Literal["remove", "replace"] | |
label_regex: re.Pattern | |
value_regex: Optional[re.Pattern] | |
replacement: Optional[str] | |
@dataclass | |
class PrometheusMetric: | |
"""A parsed Prometheus metric.""" | |
name: str | |
value: float | |
labels: Mapping[str, str] | |
help: Optional[str] | |
type: Optional[str] | |
@classmethod | |
def parse( | |
cls, metric_line: str, help: Optional[str] = None, type: Optional[str] = None | |
) -> Optional[PrometheusMetric]: | |
"""Parse a PrometheusMetric from a line in the Prometheus text format.""" | |
match = METRIC_PAT.match(metric_line) | |
if not match: | |
return None | |
name = match.group("name") | |
if match.group("labels") is None: | |
labels = {} | |
else: | |
labels = { | |
k: v.strip('"') | |
for k, v in [l.split("=") for l in match.group("labels").split(",")] | |
} | |
try: | |
value = float(match.group("value")) | |
return PrometheusMetric(name, value, labels, help, type) | |
except ValueError: | |
return None | |
def serialize(self, with_comments=True) -> str: | |
"""Serialize a PrometheusMetric to a line in the Prometheus text format.""" | |
out: List[str] = [] | |
if with_comments: | |
if self.help: | |
out.append(f"# HELP {self.name} {self.help}") | |
if self.type: | |
out.append(f"# TYPE {self.name} {self.type}") | |
val = int(self.value) if self.value.is_integer() else self.value | |
if len(self.labels) > 0: | |
label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items()) | |
out.append(f"{self.name}{{{label_str}}} {val}") | |
else: | |
out.append(f"{self.name} {val}") | |
return "\n".join(out) | |
def matches(self, other: PrometheusMetric) -> bool: | |
"""Check if this metric matches another metric, i.e. whether the metric name and the labels are identical""" | |
return self.name == other.name and self.labels == other.labels | |
@property | |
def series_id(self) -> str: | |
"""A identifier that uniquely identifies a time series (hash of name, labels and their values).""" | |
h = hashlib.sha256() | |
h.update(self.name.encode("utf-8")) | |
for k, v in sorted(self.labels.items()): | |
h.update(k.encode("utf-8")) | |
h.update(v.encode("utf-8")) | |
return h.hexdigest() | |
def fetch_metrics(url: str) -> Iterator[PrometheusMetric]: | |
"""Fetch unfiltered metrics from the upstream exporter.""" | |
with urllib.request.urlopen(url) as resp: | |
current_metric: Optional[str] = None | |
current_help: Optional[str] = None | |
current_type: Optional[str] = None | |
for line in resp: | |
line = line.strip().decode("utf8") | |
if line.startswith("# HELP"): | |
current_metric, current_help = line.split(" ", 3)[2:] | |
elif line.startswith("# TYPE"): | |
current_metric, current_type = line.split(" ", 3)[2:] | |
elif line: | |
metric = PrometheusMetric.parse(line, current_help, current_type) | |
if metric.name != current_metric: | |
metric.help = None | |
metric.type = None | |
current_metric = None | |
current_help = None | |
current_type = None | |
if metric: | |
yield metric | |
def rewrite_labels( | |
metrics: List[PrometheusMetric], rewrite_cfgs: List[RewriteConfig] | |
) -> List[PrometheusMetric]: | |
"""Rewrite Prometheus labels in counter metrics according to the configuration.""" | |
metrics_by_name = { | |
name: list(grp) | |
for name, grp in itertools.groupby(metrics, key=lambda m: m.name) | |
} | |
for cfg in rewrite_cfgs: | |
for metric_name in metrics_by_name: | |
if not cfg.metric_regex.match(metric_name): | |
continue | |
metric_type = next( | |
(m.type for m in metrics_by_name[metric_name] if m.type is not None), | |
None, | |
) | |
# We only support rewriting/removing labels for counters, since only these can be safely aggregated | |
if metric_type != "counter" or ( | |
metric_type is None and not metric_name.endswith("_total") | |
): | |
continue | |
if cfg.action == "remove": | |
for metric in metrics_by_name[metric_name]: | |
metric.labels = { | |
label: value | |
for label, value in metric.labels.items() | |
if not cfg.label_regex.match(label) | |
} | |
elif cfg.action == "replace": | |
for metric in metrics_by_name[metric_name]: | |
metric.labels = { | |
k: cfg.value_regex.sub(cfg.replacement, v) | |
for k, v in metric.labels.items() | |
} | |
out_metrics: List[PrometheusMetric] = [] | |
metrics_by_identity = itertools.groupby(metrics, key=lambda m: m.series_id) | |
for _, grouped_metrics in metrics_by_identity: | |
grouped_metrics = list(grouped_metrics) | |
if len(grouped_metrics) == 1: | |
out_metrics.append(grouped_metrics[0]) | |
else: | |
out_metrics.append( | |
PrometheusMetric( | |
grouped_metrics[0].name, | |
sum(m.value for m in grouped_metrics), | |
grouped_metrics[0].labels, | |
grouped_metrics[0].help, | |
grouped_metrics[0].type, | |
) | |
) | |
return out_metrics | |
def run_proxy( | |
nodeexporter_url: str, host: str, port: int, cfg: List[RewriteConfig], quiet: bool | |
) -> None: | |
metrics_path = urlparse(nodeexporter_url).path | |
class RewritingHandler(BaseHTTPRequestHandler): | |
server_version = f"prometheus-exporter-rewrite-proxy/{VERSION}" | |
def do_GET(self): | |
if self.path != metrics_path: | |
self.send_error(404) | |
return | |
upstream_metrics = list(fetch_metrics(nodeexporter_url)) | |
rewritten = rewrite_labels(upstream_metrics, cfg) | |
self.send_response(200) | |
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") | |
self.end_headers() | |
prev_name = None | |
for metric in rewritten: | |
self.wfile.write( | |
metric.serialize(with_comments=prev_name != metric.name).encode( | |
"utf-8" | |
) | |
) | |
self.wfile.write(b"\n") | |
prev_name = metric.name | |
def log_request(self, code: int | str = ..., size: int | str = ...) -> None: | |
if not quiet: | |
super().log_request(code, size) | |
httpd = ThreadingHTTPServer((host, port), RewritingHandler) | |
httpd.serve_forever() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description=__doc__, | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
) | |
parser.add_argument( | |
"--node-exporter-url", | |
required=True, | |
help="URL of the node_exporter", | |
) | |
parser.add_argument( | |
"--host", default="0.0.0.0", help="Host to bind to (default='0.0.0.0')" | |
) | |
parser.add_argument( | |
"--port", default=9100, type=int, help="Port to bind to (default=9100" | |
) | |
parser.add_argument( | |
"--config", | |
type=str, | |
required=False, | |
help="Path to JSON config file with an array of custom rewriting rules, will replace defaults", | |
) | |
parser.add_argument( | |
"--quiet", | |
action="store_true", | |
help="Don't log requests to stderr", | |
) | |
args = parser.parse_args() | |
if args.config: | |
with open(args.config) as fp: | |
cfg_dicts = json.load(fp) | |
else: | |
cfg_dicts = DEFAULT_METRICS_CONFIG | |
for cfg in cfg_dicts: | |
# TODO: Validate config? | |
cfg["metric_regex"] = re.compile(cfg["metric_regex"]) | |
cfg["label_regex"] = re.compile(cfg["label_regex"]) | |
if cfg["action"] == "replace": | |
cfg["value_regex"] = re.compile(cfg["value_regex"]) | |
else: | |
cfg["replacement"] = None | |
cfg["value_regex"] = None | |
run_proxy( | |
args.node_exporter_url, | |
args.host, | |
args.port, | |
[RewriteConfig(**cfg) for cfg in cfg_dicts], | |
args.quiet, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment