Created
February 21, 2015 22:44
-
-
Save timyates/7ba0a53ab57ac836cdf0 to your computer and use it in GitHub Desktop.
Saturday night with Java 8 and Circuit Breakers...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.functions.Func4; | |
import rx.subjects.BehaviorSubject; | |
import java.time.Instant; | |
import java.time.temporal.ChronoUnit; | |
import java.time.temporal.TemporalUnit; | |
import java.util.function.Supplier; | |
public class SupplierCircuitBreaker<T> implements Supplier<T> { | |
private final BehaviorSubject<TripStatus> tripObservable = BehaviorSubject.create(); | |
private final String name; | |
private final long timeout; | |
private final TemporalUnit temporalUnit; | |
private final Supplier<T> delegate; | |
private Func4<String,Long,State,Exception,T> fallback; | |
private State state = State.CLOSED; | |
private Instant tripTime; | |
private long tripCount; | |
private SupplierCircuitBreaker(String name, long timeout, TemporalUnit temporalUnit, Supplier<T> delegate, Func4<String,Long,State,Exception,T> fallback) { | |
this.timeout = timeout; | |
this.temporalUnit = temporalUnit; | |
this.name = name; | |
this.delegate = delegate; | |
this.fallback = fallback; | |
this.tripCount = 0; | |
} | |
@Override | |
public T get() throws CircuitException { | |
switch(state) { | |
case CLOSED: | |
return closedPath(); | |
case HALF_OPEN: | |
return halfOpenPath(); | |
case OPEN: | |
if(tripTime.plus(timeout, temporalUnit).isBefore(Instant.now())) { | |
halfResetCircuit(); | |
return halfOpenPath(); | |
} | |
return fallback.call(name, tripCount, state, new CircuitOpenException(name)); | |
default: | |
throw new IllegalStateException("Should not get here"); | |
} | |
} | |
private T closedPath() { | |
try { return delegate.get(); } | |
catch(Exception e) { return tripCircuit(e); } | |
} | |
private T halfOpenPath() { | |
try { | |
T ret = delegate.get(); | |
resetCircuit(); | |
return ret; | |
} | |
catch(Exception e) { return tripCircuit(e); } | |
} | |
private T tripCircuit(Exception e) { | |
if(this.state == State.HALF_OPEN) { | |
this.tripCount++; | |
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), e)); | |
} | |
this.state = State.OPEN; | |
this.tripTime = Instant.now(); | |
return fallback.call(name, tripCount, state, e); | |
} | |
private void resetCircuit() { | |
this.state = State.CLOSED; | |
this.tripCount = 0; | |
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), null)); | |
} | |
private void halfResetCircuit() { | |
this.state = State.HALF_OPEN; | |
tripObservable.onNext(new TripStatus(name, state, tripCount, Instant.now(), null)); | |
} | |
public Observable<TripStatus> status() { | |
return tripObservable; | |
} | |
public static <T> Builder<T> builder() { | |
return new Builder<>(); | |
} | |
public enum State { OPEN, HALF_OPEN, CLOSED } | |
public static class Builder<T> { | |
private Supplier<T> delegate = () -> null; | |
private String name = "UNKNOWN"; | |
private long timeout = 1; | |
private TemporalUnit temporalUnit = ChronoUnit.SECONDS; | |
private Func4<String,Long,State,Exception,T> fallback = (n, c, s, e) -> {throw new CircuitTrippedException(n, c, s, e);}; | |
private Builder() {} | |
private Builder(String name, long timeout, TemporalUnit temporalUnit, Supplier<T> delegate, Func4<String,Long,State,Exception,T> fallback) { | |
this.name = name; | |
this.timeout = timeout; | |
this.temporalUnit = temporalUnit; | |
this.delegate = delegate; | |
this.fallback = fallback; | |
} | |
public Builder<T> withName(String name) { | |
return new Builder<>(name, timeout, temporalUnit, delegate, fallback); | |
} | |
public Builder<T> withTimeout(long timeout, TemporalUnit temporalUnit) { | |
return new Builder<>(name, timeout, temporalUnit, delegate, fallback); | |
} | |
public Builder<T> withSupplier(Supplier<T> supplier) { | |
return new Builder<>(name, timeout, temporalUnit, supplier, fallback); | |
} | |
public Builder<T> withFallback(Func4<String,Long,State,Exception,T> fallback) { | |
return new Builder<>(name, timeout, temporalUnit, delegate, fallback); | |
} | |
public SupplierCircuitBreaker<T> build() { | |
return new SupplierCircuitBreaker<>(name, timeout, temporalUnit, delegate, fallback); | |
} | |
} | |
public static class TripStatus { | |
public final String name; | |
public final State state; | |
public final long tripCount; | |
public final Instant tripTime; | |
public final Exception ex; | |
public TripStatus(String name, State state, long tripCount, Instant tripTime, Exception ex) { | |
this.name = name; | |
this.state = state; | |
this.tripCount = tripCount; | |
this.tripTime = tripTime; | |
this.ex = ex; | |
} | |
} | |
public static class CircuitException extends RuntimeException { | |
CircuitException(String name) { super(name); } | |
} | |
public static class CircuitOpenException extends CircuitException { | |
CircuitOpenException(String name) { super(String.format("Circuit %s is currently OPEN", name)); } | |
} | |
public static class CircuitTrippedException extends CircuitException { | |
CircuitTrippedException(String name, Long tripCount, State state, Exception exception) { | |
super(String.format("Circuit %s tripped (%d times) from %s with %s", name, tripCount, state, exception)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment