Created
May 7, 2019 22:40
-
-
Save earonesty/4ccf8fc9bde6feac30e5c155e54dfa5f to your computer and use it in GitHub Desktop.
Monitor class for healthz maintenance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import threading | |
log = logging.getLogger(__name__) | |
class MonitorInstance: | |
def __init__(self, parent, label, func, threshold, active, metric): | |
self.parent = parent | |
self.label = label | |
self.func = func | |
self.threshold = threshold | |
self.active = active | |
self.metric = metric | |
self.__errors = None | |
def ok(self): | |
if self.__errors is None or self.__errors: | |
self.parent._ok(self) | |
self.__errors = 0 | |
if self.metric: | |
self.metric.set(0) | |
def error(self): | |
if not self.__errors: | |
self.parent._error(self) | |
if self.__errors is None: | |
self.__errors = 0 | |
self.__errors += 1 | |
if self.metric: | |
self.metric.inc() | |
def check(self): | |
try: | |
self.func() | |
self.ok() | |
except Exception as e: | |
log.error("%s error: %s", self.label, e) | |
self.error() | |
@property | |
def healthy(self): | |
return self.__errors < self.threshold | |
DEFAULT_THRESHOLD = 1 # errors to cause fault | |
DEFAULT_CHECKSECS = 5 # time in secs between checks | |
class Monitor: | |
def __init__(self, health_callback=None, check_secs=DEFAULT_CHECKSECS, use_thread=False): | |
self.active = [] # active moniors | |
self.alerts = set() # thresholds currently triggered (not healthy) | |
self.health_callback = health_callback | |
self.healthy = False # default: not healthy unless a monitor is added! | |
self.check_secs = check_secs | |
self.last_check = 0 | |
if use_thread: | |
assert self.check_secs > 0, "threads need to sleep" | |
threading.Thread(target=self._thread_loop, daemon=True).start() | |
def add(self, label, check, threshold=DEFAULT_THRESHOLD, active=False, metric=None): | |
inst = MonitorInstance(self, label, check, threshold, active, metric) | |
if active: | |
self.active.append(inst) | |
inst.check() | |
return inst | |
def _error(self, inst): | |
self.alerts.add(inst) | |
if self.healthy: | |
self._callback(False) | |
self.healthy = False | |
def _thread_loop(self): | |
while True: | |
self.check() | |
time.sleep(self.check_secs) | |
def _callback(self, value): | |
if not self.health_callback is None: | |
try: | |
self.health_callback(value) | |
except: | |
# health callback should always succeed! | |
log.exception("deadlyexes: error calling %s", self.health_callback) | |
def _ok(self, inst): | |
self.alerts.discard(inst) | |
if not self.healthy and not self.alerts: | |
self._callback(True) | |
self.healthy = True | |
def check(self, force=False): | |
if not force and (time.time() < (self.last_check + self.check_secs)): | |
return False | |
# returns true if check was done | |
checked=False | |
# convert to list prevents modifying iterators | |
for inst in list(self.alerts) + self.active: | |
try: | |
checked=True | |
inst.check() | |
except: | |
pass | |
return checked | |
import unittest | |
from unittest.mock import MagicMock | |
class TestDeadly(unittest.TestCase): | |
def test_basic(self): | |
m = Monitor() | |
ex = False | |
inst = m.add("db", lambda: 1/0 if ex else 1, 1, False) | |
assert m.healthy == True | |
assert inst.healthy == True | |
assert m.check() == False # no point in checking stuff that isn't errored | |
assert m.check(force=True) == False # no active monitors to check | |
inst.error() # passive monitor just alerted | |
assert m.healthy == False # no longer healthy | |
assert inst.healthy == False | |
assert m.check() == True # fire off a check | |
assert m.healthy == True # all is ok now | |
assert m.check() == False # not checking again right away | |
inst.error() # passive monitor just alerted | |
ex = True # checker will fail | |
assert m.check(force=True) == True # force-check will check | |
assert m.check(force=True) == True # force-check will check again | |
assert m.healthy == False # still bad | |
ex = False # checker will be ok | |
assert m.check(force=True) == True # force-check will check | |
assert m.healthy == True | |
def test_metric(self): | |
m = Monitor() | |
mockmetric = MagicMock() | |
ex = False | |
inst = m.add("db", lambda: 1/0 if ex else 1, 1, False, mockmetric) | |
inst.error() | |
inst.error() | |
assert mockmetric.inc.call_count == 2 | |
def test_active(self): | |
m = Monitor() | |
ex = False | |
m.add("db", lambda: 1/0 if ex else 1, 1, active=True) | |
assert m.healthy == True | |
assert m.check() == True | |
assert m.check(force=True) == True # actives are always checked, even when healthy | |
assert m.check(force=True) == True | |
ex = True | |
assert m.check(force=True) == True | |
assert m.healthy == False | |
ex = False | |
assert m.check(force=True) == True | |
assert m.healthy == True | |
def test_thread(self): | |
with self.assertRaises(Exception): | |
m = Monitor(use_thread=True, check_secs=0) | |
m = Monitor(use_thread=True, check_secs=0.01) | |
ex = False | |
m.add("db", lambda: 1/0 if ex else 1, 1, True) | |
assert(m.healthy) | |
ex = True | |
time.sleep(0.5) | |
assert(not m.healthy) | |
ex = False | |
time.sleep(0.5) | |
assert(m.healthy) | |
def test_callback(self): | |
event = threading.Event() | |
state = None | |
def cb(val): | |
nonlocal state | |
state = val | |
event.set() | |
m = Monitor(use_thread=True, check_secs=0.01, health_callback=cb) | |
ex = False | |
m.add("db", lambda: 1/0 if ex else 1, 1, True) | |
assert(m.healthy) | |
event.clear() | |
state = None | |
ex = True | |
event.wait() | |
assert state == False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment