Last active
October 14, 2016 15:18
-
-
Save JackStouffer/ff2d5dbc61d0713bb24dfe1d5acaf31a to your computer and use it in GitHub Desktop.
A Circuit Breaker, basically a way to auto fail to relieve pressure on an external service after a number of failures in quick succession
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
BREAKER_OPEN = 0 | |
BREAKER_HALF_OPEN = 1 | |
BREAKER_CLOSED = 2 | |
class OpenException(Exception): | |
pass | |
class CircuitBreaker(object): | |
def __init__(self, open_level=500, error_rollover_time=3600, open_rollover_time=200): | |
""" Ctor | |
open_level = n number of errors to wait for to open the breaker | |
error_rollover_time = the number of seconds between errors that | |
needed to reset the error counter | |
open_rollover_time = the number of seconds that are needed to be error | |
free before the breaker will switch from BREAKER_HALF_OPEN to BREAKER_CLOSED | |
""" | |
self._error_count = 0 | |
self._status = BREAKER_CLOSED | |
self._open_level = open_level | |
self._error_rollover_time = error_rollover_time | |
self._open_rollover_time = open_rollover_time | |
self._last_error_timestamp = None | |
self._open_timestamp = None | |
def increment_error(self): | |
""" Increment the error counter, but only if this error | |
is within rollover_time of of the last error, otherwise | |
reset the error count | |
""" | |
if self._status == BREAKER_HALF_OPEN: | |
self._status = BREAKER_OPEN | |
self._open_timestamp = datetime.datetime.now() | |
if ( | |
self._last_error_timestamp is None or | |
datetime.datetime.now() - self._last_error_timestamp <= datetime.timedelta(seconds=self._error_rollover_time) | |
): | |
self._error_count += 1 | |
else: | |
self._error_count = 1 | |
self._last_error_timestamp = datetime.datetime.now() | |
if self._error_count == self._open_level: | |
self._status = BREAKER_OPEN | |
self._open_timestamp = datetime.datetime.now() | |
def get_status(self): | |
""" Get the current status of the breaker | |
""" | |
if self._status == BREAKER_OPEN and datetime.datetime.now() - self._open_timestamp <= datetime.timedelta(seconds=self._open_rollover_time): | |
return BREAKER_OPEN | |
elif self._status == BREAKER_OPEN and datetime.datetime.now() - self._open_timestamp > datetime.timedelta(seconds=self._open_rollover_time): | |
self._status = BREAKER_HALF_OPEN | |
return BREAKER_HALF_OPEN | |
elif self._status == BREAKER_HALF_OPEN and datetime.datetime.now() - self._open_timestamp > datetime.timedelta(seconds=self._open_rollover_time * 2): | |
self._status = BREAKER_CLOSED | |
self._error_count = 0 | |
return BREAKER_CLOSED | |
elif self._status == BREAKER_HALF_OPEN and datetime.datetime.now() - self._open_timestamp < datetime.timedelta(seconds=self._open_rollover_time * 2): | |
return BREAKER_HALF_OPEN | |
elif self._status == BREAKER_CLOSED: | |
return BREAKER_CLOSED | |
def status_or_throw(self): | |
""" If the breaker is open, don't allow anything through | |
""" | |
if self.get_status() == BREAKER_OPEN: | |
raise OpenException() | |
return |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! ../env/bin/python | |
# -*- coding: utf-8 -*- | |
import pytest | |
import time | |
from circuit_breaker import CircuitBreaker, BREAKER_HALF_OPEN, BREAKER_OPEN, BREAKER_CLOSED | |
class TestCircuitBreaker: | |
def test_increment(self): | |
c = CircuitBreaker() | |
c.increment_error() | |
assert c._error_count == 1 | |
def test_error_rollover(self): | |
c = CircuitBreaker(error_rollover_time=1) | |
c.increment_error() | |
c.increment_error() | |
time.sleep(1) | |
c.increment_error() | |
assert c._error_count == 1 | |
def test_open(self): | |
c = CircuitBreaker(open_level=10) | |
for i in range(10): | |
c.increment_error() | |
if i != 9: | |
assert c.get_status() == BREAKER_CLOSED | |
else: | |
assert c.get_status() == BREAKER_OPEN | |
def test_throw(self): | |
c = CircuitBreaker(open_level=2) | |
c.increment_error() | |
c.increment_error() | |
try: | |
c.status_or_throw() | |
except OpenException: | |
assert True | |
else: | |
assert False | |
def test_half_open(self): | |
c = CircuitBreaker(open_level=3, open_rollover_time=2) | |
for i in range(3): | |
c.increment_error() | |
assert c.get_status() == BREAKER_OPEN | |
time.sleep(2) | |
assert c.get_status() == BREAKER_HALF_OPEN | |
def test_reopen(self): | |
c = CircuitBreaker(open_level=3, open_rollover_time=1) | |
for i in range(3): | |
c.increment_error() | |
assert c.get_status() == BREAKER_OPEN | |
time.sleep(1) | |
assert c.get_status() == BREAKER_HALF_OPEN | |
c.increment_error() | |
assert c.get_status() == BREAKER_OPEN | |
def test_auto_close(self): | |
c = CircuitBreaker(open_level=3, open_rollover_time=1) | |
for i in range(3): | |
c.increment_error() | |
assert c.get_status() == BREAKER_OPEN | |
time.sleep(1) | |
assert c.get_status() == BREAKER_HALF_OPEN | |
time.sleep(1) | |
assert c.get_status() == BREAKER_CLOSED | |
c.increment_error() | |
c.increment_error() | |
assert c.get_status() == BREAKER_CLOSED | |
c.increment_error() | |
assert c.get_status() == BREAKER_OPEN |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment