Skip to content

Instantly share code, notes, and snippets.

@sithu
Last active October 27, 2016 01:27
Show Gist options
  • Select an option

  • Save sithu/5c3552d37176c8ae5582861b9b7b2020 to your computer and use it in GitHub Desktop.

Select an option

Save sithu/5c3552d37176c8ae5582861b9b7b2020 to your computer and use it in GitHub Desktop.
Complete this Two-Stage (Open-and-Closed) circuit breaker implementation.
Initial state is set to _is_closed = True.
When the number of failure reaches to the limit, set the status to _is_closed = False along with _open_since time. Whenever the current time passes _open_since + _reset_timeout, the circuit breaker will let the call go through.
If the attempt is successful, reset the status to _is_closed = True and reset the failure count to zero.
To test your code, place this cbdemo.py (Links to an external site.) under the same location as cb.py. See the expected output (Links to an external site.) for a correct solution.
### cbdemo.py ####
https://gist.github.com/sithu/6aa4c0b1832f50704a5dc9f733b91a61
### Expected Output ####
https://gist.github.com/sithu/d23d354d21ca5d6273c5a6f934b7d8c3
FEEL FREE TO ADD ANY FUNCTION YOU NEED.
############ cb.py ##############
from functools import wraps
from datetime import datetime, timedelta
class CircuitBreaker(object):
def __init__(self, name=None, expected_exception=Exception, max_failure_to_open=3, reset_timeout=10):
"""
DO NOT ALTER FUNCTION ARGUMENTS AND THEIR DEFAULT VALUES!
"""
self._name = name
self._expected_exception = expected_exception
self._max_failure_to_open = max_failure_to_open
self._reset_timeout = reset_timeout
# Set the initial state
self.close()
def close(self):
"""
Your solution must work with this '_is_closed' boolean flag variable only.
DO NOT SET ANYTHING OTHER THAN True OR False, OR YOU WILL GET ZERO!
"""
self._is_closed = True
self._failure_count = 0
def open(self):
self._is_closed = False
self._opened_since = datetime.utcnow()
def __call__(self, func):
if self._name is None:
self._name = func.__name__
@wraps(func)
def with_circuitbreaker(*args, **kwargs):
return self.call(func, *args, **kwargs)
return with_circuitbreaker
def call(self, func, *args, **kwargs):
"""
Steps:
1. If the cricuitbreaker is NOT in the executable state, then
throw an Exception with this error message format -
'CircuitBreaker[%s] is OPEN until %s (%d failures, %d sec remaining)'
{name} {open_until} {failure_count} {open_remaining}
2. Call the given 'func' and handle the 'expected_exception'.
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment