Skip to content

Instantly share code, notes, and snippets.

@timyates
Created February 21, 2015 22:44
Show Gist options
  • Save timyates/7ba0a53ab57ac836cdf0 to your computer and use it in GitHub Desktop.
Save timyates/7ba0a53ab57ac836cdf0 to your computer and use it in GitHub Desktop.
Saturday night with Java 8 and Circuit Breakers...
import rx.Observable;
import rx.functions.Func4;
import rx.subjects.BehaviorSubject;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.function.Supplier;
public class SupplierCircuitBreaker<T> implements Supplier<T> {
private final BehaviorSubject<TripStatus> tripObservable = BehaviorSubject.create();
private final String name;
private final long timeout;
private final TemporalUnit temporalUnit;
private final Supplier<T> delegate;
private Func4<String,Long,State,Exception,T> fallback;
private State state = State.CLOSED;
private Instant tripTime;
private long tripCount;
private SupplierCircuitBreaker(String name, long timeout, TemporalUnit temporalUnit, Supplier<T> delegate, Func4<String,Long,State,Exception,T> fallback) {
this.timeout = timeout;
this.temporalUnit = temporalUnit;
this.name = name;
this.delegate = delegate;
this.fallback = fallback;
this.tripCount = 0;
}
@Override
public T get() throws CircuitException {
switch(state) {
case CLOSED:
return closedPath();
case HALF_OPEN:
return halfOpenPath();
case OPEN:
if(tripTime.plus(timeout, temporalUnit).isBefore(Instant.now())) {
halfResetCircuit();
return halfOpenPath();
}
return fallback.call(name, tripCount, state, new CircuitOpenException(name));
default:
throw new IllegalStateException("Should not get here");
}
}
private T closedPath() {
try { return delegate.get(); }
catch(Exception e) { return tripCircuit(e); }
}
private T halfOpenPath() {
try {
T ret = delegate.get();
resetCircuit();
return ret;
}
catch(Exception e) { return tripCircuit(e); }
}
private T tripCircuit(Exception e) {
if(this.state == State.HALF_OPEN) {
this.tripCount++;
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), e));
}
this.state = State.OPEN;
this.tripTime = Instant.now();
return fallback.call(name, tripCount, state, e);
}
private void resetCircuit() {
this.state = State.CLOSED;
this.tripCount = 0;
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), null));
}
private void halfResetCircuit() {
this.state = State.HALF_OPEN;
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), null));
}
public Observable<TripStatus> status() {
return tripObservable;
}
public static <T> Builder<T> builder() {
return new Builder<>();
}
public enum State { OPEN, HALF_OPEN, CLOSED }
public static class Builder<T> {
private Supplier<T> delegate = () -> null;
private String name = "UNKNOWN";
private long timeout = 1;
private TemporalUnit temporalUnit = ChronoUnit.SECONDS;
private Func4<String,Long,State,Exception,T> fallback = (n, c, s, e) -> {throw new CircuitTrippedException(n, c, s, e);};
private Builder() {}
private Builder(String name, long timeout, TemporalUnit temporalUnit, Supplier<T> delegate, Func4<String,Long,State,Exception,T> fallback) {
this.name = name;
this.timeout = timeout;
this.temporalUnit = temporalUnit;
this.delegate = delegate;
this.fallback = fallback;
}
public Builder<T> withName(String name) {
return new Builder<>(name, timeout, temporalUnit, delegate, fallback);
}
public Builder<T> withTimeout(long timeout, TemporalUnit temporalUnit) {
return new Builder<>(name, timeout, temporalUnit, delegate, fallback);
}
public Builder<T> withSupplier(Supplier<T> supplier) {
return new Builder<>(name, timeout, temporalUnit, supplier, fallback);
}
public Builder<T> withFallback(Func4<String,Long,State,Exception,T> fallback) {
return new Builder<>(name, timeout, temporalUnit, delegate, fallback);
}
public SupplierCircuitBreaker<T> build() {
return new SupplierCircuitBreaker<>(name, timeout, temporalUnit, delegate, fallback);
}
}
public static class TripStatus {
public final String name;
public final State state;
public final long tripCount;
public final Instant tripTime;
public final Exception ex;
public TripStatus(String name, State state, long tripCount, Instant tripTime, Exception ex) {
this.name = name;
this.state = state;
this.tripCount = tripCount;
this.tripTime = tripTime;
this.ex = ex;
}
}
public static class CircuitException extends RuntimeException {
CircuitException(String name) { super(name); }
}
public static class CircuitOpenException extends CircuitException {
CircuitOpenException(String name) { super(String.format("Circuit %s is currently OPEN", name)); }
}
public static class CircuitTrippedException extends CircuitException {
CircuitTrippedException(String name, Long tripCount, State state, Exception exception) {
super(String.format("Circuit %s tripped (%d times) from %s with %s", name, tripCount, state, exception));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment