Created
August 4, 2022 17:24
-
-
Save fpytloun/198be754dafc26715ce735fa01db4c41 to your computer and use it in GitHub Desktop.
Vector health check that verifies sinks input/output
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
import requests | |
import sys | |
import time | |
from collections import defaultdict | |
from prometheus_client.parser import text_string_to_metric_families | |
sys.stdout.reconfigure(line_buffering=True) | |
sys.stderr.reconfigure(line_buffering=True) | |
logging.basicConfig( | |
format='%(asctime)s.%(msecs)03d000Z %(levelname)s %(name)s::%(funcName)s: %(message)s', | |
datefmt='%Y-%m-%dT%H:%M:%S') | |
lg = logging.getLogger('health') | |
def parse_args(args=None): | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-v', '--verbose', help="Enable verbose logging", | |
action="store_true", required=False) | |
parser.add_argument('-d', '--debug', help="Enable debug logging", | |
action="store_true", required=False) | |
parser.add_argument('-i', '--interval', type=int, help="Seconds to wait between samples", default=60) | |
parser.add_argument('--threshold', type=int, help="Percentage of input events that did not pass out through sink", default=80) | |
parser.add_argument('--exclude', nargs="+", help="List of component types to exclude", default=["prometheus_exporter"]) | |
parser.add_argument('--include', nargs="+", help="Only include these component types, overrides --exclude") | |
parser.add_argument('--health', help="URL of health endpoint", default="http://localhost:8686/health") | |
parser.add_argument('--metrics', help="URL of metrics endpoint", default="http://localhost:9598/metrics") | |
return parser.parse_args(args) | |
def get_sink_metrics(url, exclude, include=[]): | |
lg.debug("Querying metrics endpoint at %s", url) | |
try: | |
metrics = requests.get(url).text | |
except requests.exceptions.HTTPError as e: | |
lg.error("Metrics endpoint returned error: %s", e.response.text) | |
sys.exit(1) | |
sinks = defaultdict(dict) | |
for family in text_string_to_metric_families(metrics): | |
if family.name in ["vector_events_in", "vector_events_out"]: | |
for sample in family.samples: | |
if sample.labels['component_kind'] == "sink": | |
if (include and sample.labels["component_type"] in include) or (not include and sample.labels["component_type"] not in exclude): | |
lg.debug(sample) | |
sinks[sample.labels['component_name']][family.name] = sample.value | |
return sinks | |
def main(): | |
args = parse_args() | |
if args.verbose: | |
lg.setLevel(logging.INFO) | |
else: | |
lg.setLevel(logging.WARNING) | |
if args.debug: | |
lg.setLevel(logging.DEBUG) | |
lg.debug("Checking health endpoint at %s", args.health) | |
try: | |
res = requests.get(args.health) | |
except requests.exceptions.HTTPError as e: | |
lg.error("Health endpoint returned error: %s", e.response.text) | |
sys.exit(1) | |
lg.info("Health endpoint returned: %s", res.text) | |
sink_metrics = get_sink_metrics(args.metrics, args.exclude, args.include) | |
if sink_metrics: | |
lg.debug("Waiting %s seconds between samples", args.interval) | |
time.sleep(args.interval) | |
errors = False | |
for sink, metrics in get_sink_metrics(args.metrics, args.exclude, args.include).items(): | |
sample_in = metrics.get("vector_events_in", 0) - sink_metrics[sink].get("vector_events_in", 0) | |
sample_out = metrics.get("vector_events_out", 0) - sink_metrics[sink].get("vector_events_out", 0) | |
sample_diff = sample_in - sample_out | |
if sample_in == 0: | |
sample_percent = 0 | |
elif sample_out <= sample_in: | |
sample_percent = sample_diff / sample_in * 100 | |
else: | |
sample_percent = sample_diff / sample_in * 100 | |
lg.info("Sample for sink %s: interval=%ds, in=%d, out=%d, diff=%d (%d%%)", sink, args.interval, sample_in, sample_out, sample_diff, sample_percent) | |
if sample_percent > args.threshold: | |
lg.error("%d%% incoming events didn't pass out through sink %s in %d seconds which is more than %d%% threshold", sample_percent, sink, args.interval, args.threshold) | |
errors = True | |
if errors: | |
sys.exit(1) | |
else: | |
lg.warning("No sinks found to check") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment