Skip to content

Instantly share code, notes, and snippets.

@remh
Created March 29, 2016 18:48
Show Gist options
  • Save remh/54eca44636cf8d7ff3628af26e8074d4 to your computer and use it in GitHub Desktop.
Save remh/54eca44636cf8d7ff3628af26e8074d4 to your computer and use it in GitHub Desktop.
# stdlib
from collections import defaultdict
import re
import time
# 3rd party
import requests
# project
from checks import AgentCheck
from config import _is_affirmative
from util import headers
STATS_URL = "/;csv;norefresh"
EVENT_TYPE = SOURCE_TYPE_NAME = 'haproxy'
class Services(object):
BACKEND = 'BACKEND'
FRONTEND = 'FRONTEND'
ALL = (BACKEND, FRONTEND)
ALL_STATUSES = (
'up', 'open', 'down', 'maint', 'nolb'
)
STATUSES_TO_SERVICE_CHECK = {
'UP': AgentCheck.OK,
'DOWN': AgentCheck.CRITICAL,
'no check': AgentCheck.UNKNOWN,
'MAINT': AgentCheck.OK,
}
class HAProxy(AgentCheck):
def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
# Host status needs to persist across all checks
self.host_status = defaultdict(lambda: defaultdict(lambda: None))
METRICS = {
"qcur": ("gauge", "queue.current"),
"scur": ("gauge", "session.current"),
"slim": ("gauge", "session.limit"),
"spct": ("gauge", "session.pct"), # Calculated as: (scur/slim)*100
"stot": ("rate", "session.rate"),
"bin": ("rate", "bytes.in_rate"),
"bout": ("rate", "bytes.out_rate"),
"dreq": ("rate", "denied.req_rate"),
"dresp": ("rate", "denied.resp_rate"),
"ereq": ("rate", "errors.req_rate"),
"econ": ("rate", "errors.con_rate"),
"eresp": ("rate", "errors.resp_rate"),
"wretr": ("rate", "warnings.retr_rate"),
"wredis": ("rate", "warnings.redis_rate"),
"req_rate": ("gauge", "requests.rate"), # HA Proxy 1.4 and higher
"hrsp_1xx": ("rate", "response.1xx"), # HA Proxy 1.4 and higher
"hrsp_2xx": ("rate", "response.2xx"), # HA Proxy 1.4 and higher
"hrsp_3xx": ("rate", "response.3xx"), # HA Proxy 1.4 and higher
"hrsp_4xx": ("rate", "response.4xx"), # HA Proxy 1.4 and higher
"hrsp_5xx": ("rate", "response.5xx"), # HA Proxy 1.4 and higher
"hrsp_other": ("rate", "response.other"), # HA Proxy 1.4 and higher
"qtime": ("gauge", "queue.time"), # HA Proxy 1.5 and higher
"ctime": ("gauge", "connect.time"), # HA Proxy 1.5 and higher
"rtime": ("gauge", "response.time"), # HA Proxy 1.5 and higher
"ttime": ("gauge", "session.time"), # HA Proxy 1.5 and higher
}
SERVICE_CHECK_NAME = 'haproxy.backend_up'
def check(self, instance):
url = instance.get('url')
username = instance.get('username')
password = instance.get('password')
collect_aggregates_only = _is_affirmative(
instance.get('collect_aggregates_only', True)
)
collect_status_metrics = _is_affirmative(
instance.get('collect_status_metrics', False)
)
collect_status_metrics_by_host = _is_affirmative(
instance.get('collect_status_metrics_by_host', False)
)
count_status_by_service = _is_affirmative(
instance.get('count_status_by_service', True)
)
tag_service_check_by_host = _is_affirmative(
instance.get('tag_service_check_by_host', False)
)
services_incl_filter = instance.get('services_include', [])
services_excl_filter = instance.get('services_exclude', [])
self.log.debug('Processing HAProxy data for %s' % url)
data = self._fetch_data(url, username, password)
process_events = instance.get('status_check', self.init_config.get('status_check', False))
self._process_data(
data, collect_aggregates_only, process_events,
url=url, collect_status_metrics=collect_status_metrics,
collect_status_metrics_by_host=collect_status_metrics_by_host,
tag_service_check_by_host=tag_service_check_by_host,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter,
count_status_by_service=count_status_by_service,
)
def _fetch_data(self, url, username, password):
''' Hit a given URL and return the parsed json '''
# Try to fetch data from the stats URL
auth = (username, password)
url = "%s%s" % (url, STATS_URL)
self.log.debug("HAProxy Fetching haproxy search data from: %s" % url)
r = requests.get(url, auth=auth, headers=headers(self.agentConfig), verify=False)
r.raise_for_status()
return r.content.splitlines()
def _process_data(self, data, collect_aggregates_only, process_events, url=None,
collect_status_metrics=False, collect_status_metrics_by_host=False,
tag_service_check_by_host=False, services_incl_filter=None,
services_excl_filter=None, count_status_by_service=True):
''' Main data-processing loop. For each piece of useful data, we'll
either save a metric, save an event or both. '''
# Split the first line into an index of fields
# The line looks like:
# "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,"
fields = [f.strip() for f in data[0][2:].split(',') if f]
self.hosts_statuses = defaultdict(int)
back_or_front = None
# Skip the first line, go backwards to set back_or_front
for line in data[:0:-1]:
if not line.strip():
continue
# Store each line's values in a dictionary
data_dict = self._line_to_dict(fields, line)
if self._is_aggregate(data_dict):
back_or_front = data_dict['svname']
self._update_data_dict(data_dict, back_or_front)
self._update_hosts_statuses_if_needed(
collect_status_metrics, collect_status_metrics_by_host,
data_dict, self.hosts_statuses
)
if self._should_process(data_dict, collect_aggregates_only):
# update status
# Send the list of data to the metric and event callbacks
self._process_metrics(
data_dict, url,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
)
if process_events:
self._process_event(
data_dict, url,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
)
self._process_service_check(
data_dict, url,
tag_by_host=tag_service_check_by_host,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
)
if collect_status_metrics:
self._process_status_metric(
self.hosts_statuses, collect_status_metrics_by_host,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter,
count_status_by_service=count_status_by_service
)
self._process_backend_hosts_metric(
self.hosts_statuses,
services_incl_filter=services_incl_filter,
services_excl_filter=services_excl_filter
)
return data
def _line_to_dict(self, fields, line):
data_dict = {}
for i, val in enumerate(line.split(',')[:]):
if val:
try:
# Try converting to a long, if failure, just leave it
val = float(val)
except Exception:
pass
data_dict[fields[i]] = val
return data_dict
def _update_data_dict(self, data_dict, back_or_front):
"""
Adds spct if relevant, adds service
"""
data_dict['back_or_front'] = back_or_front
# The percentage of used sessions based on 'scur' and 'slim'
if 'slim' in data_dict and 'scur' in data_dict:
try:
data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
except (TypeError, ZeroDivisionError):
pass
def _is_aggregate(self, data_dict):
return data_dict['svname'] in Services.ALL
def _update_hosts_statuses_if_needed(self, collect_status_metrics,
collect_status_metrics_by_host,
data_dict, hosts_statuses):
if data_dict['svname'] == Services.BACKEND:
return
if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
if collect_status_metrics_by_host and 'svname' in data_dict:
key = (data_dict['pxname'], data_dict['svname'], data_dict['status'])
else:
key = (data_dict['pxname'], data_dict['status'])
hosts_statuses[key] += 1
def _should_process(self, data_dict, collect_aggregates_only):
"""
if collect_aggregates_only, we process only the aggregates
else we process all except Services.BACKEND
"""
if collect_aggregates_only:
if self._is_aggregate(data_dict):
return True
return False
elif data_dict['svname'] == Services.BACKEND:
return False
return True
def _is_service_excl_filtered(self, service_name, services_incl_filter,
services_excl_filter):
if self._tag_match_patterns(service_name, services_excl_filter):
if self._tag_match_patterns(service_name, services_incl_filter):
return False
return True
return False
def _tag_match_patterns(self, tag, filters):
if not filters:
return False
for rule in filters:
if re.search(rule, tag):
return True
return False
def _process_backend_hosts_metric(self, hosts_statuses, services_incl_filter=None,
services_excl_filter=None):
agg_statuses = defaultdict(lambda: {'available': 0, 'unavailable': 0})
for host_status, count in hosts_statuses.iteritems():
try:
service, hostname, status = host_status
except Exception:
service, status = host_status
if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
continue
status = status.lower()
if 'up' in status:
agg_statuses[service]['available'] += count
elif 'down' in status or 'maint' in status or 'nolb' in status:
agg_statuses[service]['unavailable'] += count
else:
# create the entries for this service anyway
agg_statuses[service]
for service in agg_statuses:
tags = ['service:%s' % service]
self.gauge(
'haproxy.backend_hosts',
agg_statuses[service]['available'],
tags=tags + ['available:true'])
self.gauge(
'haproxy.backend_hosts',
agg_statuses[service]['unavailable'],
tags=tags + ['available:false'])
return agg_statuses
def _process_status_metric(self, hosts_statuses, collect_status_metrics_by_host,
services_incl_filter=None, services_excl_filter=None,
count_status_by_service=True):
agg_statuses = defaultdict(lambda: {'available': 0, 'unavailable': 0})
# use a counter unless we have a unique tag set to gauge
counter = defaultdict(int)
if count_status_by_service and collect_status_metrics_by_host:
# `service` and `backend` tags will exist
counter = None
for host_status, count in hosts_statuses.iteritems():
try:
service, hostname, status = host_status
except Exception:
service, status = host_status
status = status.lower()
tags = []
if count_status_by_service:
tags.append('service:%s' % service)
if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
continue
if collect_status_metrics_by_host:
tags.append('backend:%s' % hostname)
self._gauge_all_statuses(
"haproxy.count_per_status",
count, status, tags, counter
)
if 'up' in status or 'open' in status:
agg_statuses[service]['available'] += count
if 'down' in status or 'maint' in status or 'nolb' in status:
agg_statuses[service]['unavailable'] += count
if counter is not None:
# send aggregated counts as gauges
for key, count in counter.iteritems():
metric_name, tags = key[0], key[1]
self.gauge(metric_name, count, tags=tags)
for service in agg_statuses:
for status, count in agg_statuses[service].iteritems():
tags = ['status:%s' % status]
if count_status_by_service:
tags.append('service:%s' % service)
self.gauge("haproxy.count_per_status", count, tags=tags)
def _gauge_all_statuses(self, metric_name, count, status, tags, counter):
if counter is not None:
counter_key = tuple([metric_name, tuple(tags + ['status:%s' % status])])
counter[counter_key] += count
else:
# assume we have enough context, just send a gauge
self.gauge(metric_name, count, tags + ['status:%s' % status])
for state in Services.ALL_STATUSES:
if state != status:
self.gauge(metric_name, 0, tags + ['status:%s' % state.replace(" ", "_")])
def _process_metrics(self, data, url, services_incl_filter=None,
services_excl_filter=None):
"""
Data is a dictionary related to one host
(one line) extracted from the csv.
It should look like:
{'pxname':'dogweb', 'svname':'i-4562165', 'scur':'42', ...}
"""
hostname = data['svname']
service_name = data['pxname']
back_or_front = data['back_or_front']
tags = ["type:%s" % back_or_front, "instance_url:%s" % url]
tags.append("service:%s" % service_name)
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
return
if back_or_front == Services.BACKEND:
tags.append('backend:%s' % hostname)
for key, value in data.items():
if HAProxy.METRICS.get(key):
suffix = HAProxy.METRICS[key][1]
name = "haproxy.%s.%s" % (back_or_front.lower(), suffix)
if HAProxy.METRICS[key][0] == 'rate':
self.rate(name, value, tags=tags)
else:
self.gauge(name, value, tags=tags)
def _process_event(self, data, url, services_incl_filter=None,
services_excl_filter=None):
'''
Main event processing loop. An event will be created for a service
status change.
Service checks on the server side can be used to provide the same functionality
'''
hostname = data['svname']
service_name = data['pxname']
key = "%s:%s" % (hostname, service_name)
status = self.host_status[url][key]
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
return
if status is None:
self.host_status[url][key] = data['status']
return
if status != data['status'] and data['status'] in ('UP', 'DOWN'):
# If the status of a host has changed, we trigger an event
try:
lastchg = int(data['lastchg'])
except Exception:
lastchg = 0
# Create the event object
ev = self._create_event(
data['status'], hostname, lastchg, service_name,
data['back_or_front']
)
self.event(ev)
# Store this host status so we can check against it later
self.host_status[url][key] = data['status']
def _create_event(self, status, hostname, lastchg, service_name, back_or_front):
HAProxy_agent = self.hostname.decode('utf-8')
if status == "DOWN":
alert_type = "error"
title = "%s reported %s:%s %s" % (HAProxy_agent, service_name, hostname, status)
else:
if status == "UP":
alert_type = "success"
else:
alert_type = "info"
title = "%s reported %s:%s back and %s" % (HAProxy_agent, service_name, hostname, status)
tags = ["service:%s" % service_name]
if back_or_front == Services.BACKEND:
tags.append('backend:%s' % hostname)
return {
'timestamp': int(time.time() - lastchg),
'event_type': EVENT_TYPE,
'host': HAProxy_agent,
'msg_title': title,
'alert_type': alert_type,
"source_type_name": SOURCE_TYPE_NAME,
"event_object": hostname,
"tags": tags
}
def _process_service_check(self, data, url, tag_by_host=False,
services_incl_filter=None, services_excl_filter=None):
''' Report a service check, tagged by the service and the backend.
Statuses are defined in `STATUSES_TO_SERVICE_CHECK` mapping.
'''
service_name = data['pxname']
status = data['status']
haproxy_hostname = self.hostname.decode('utf-8')
check_hostname = haproxy_hostname if tag_by_host else ''
if self._is_service_excl_filtered(service_name, services_incl_filter,
services_excl_filter):
return
if status in Services.STATUSES_TO_SERVICE_CHECK:
service_check_tags = ["service:%s" % service_name]
hostname = data['svname']
if data['back_or_front'] == Services.BACKEND:
service_check_tags.append('backend:%s' % hostname)
status = Services.STATUSES_TO_SERVICE_CHECK[status]
message = "%s reported %s:%s %s" % (haproxy_hostname, service_name,
hostname, status)
self.service_check(self.SERVICE_CHECK_NAME, status, message=message,
hostname=check_hostname, tags=service_check_tags)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment