Created
August 26, 2020 00:18
-
-
Save rranelli/ea75fc3500252ec03999c0b6626251e6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
alias ControlPlane.CircuitBreaker | |
defmodule CircuitBreaker do | |
use Supervisor | |
defstruct [ | |
:circuit_ref, | |
:worker_module, | |
ets: :circuit_breaker_states, | |
restart_period_ms: 500, | |
worker_args: [] | |
] | |
def start_link(worker_args) when is_atom(worker_args) or is_tuple(worker_args) do | |
circuit_breaker = new(worker_args) | |
unless circuit_breaker.worker_module do | |
raise ArgumentError, | |
"parameter `worker` must be a module or a {module, args} tuple. got: #{ | |
inspect(worker_args) | |
}" | |
end | |
Supervisor.start_link(__MODULE__, circuit_breaker, name: circuit_breaker.circuit_ref) | |
end | |
@impl true | |
def init(circuit_breaker) do | |
CircuitBreaker.State.register(circuit_breaker) | |
children = [ | |
worker_spec(circuit_breaker), | |
{CircuitBreaker.Restarter, circuit_breaker} | |
] | |
Supervisor.init(children, strategy: :one_for_all) | |
end | |
defp worker_spec(circuit_breaker) do | |
%{ | |
id: :worker, | |
start: {__MODULE__, :protected_start_link, [circuit_breaker]}, | |
restart: :permanent | |
} | |
end | |
@doc false | |
def protected_start_link( | |
circuit_breaker = %{worker_module: worker_module, worker_args: worker_args} | |
) do | |
if CircuitBreaker.state(circuit_breaker) in [:normal, :restarting] do | |
case apply(worker_module, :start_link, worker_args) do | |
{:ok, pid} -> | |
set_state(circuit_breaker, :normal) | |
{:ok, pid} | |
otherwise -> | |
otherwise | |
end | |
else | |
:ignore | |
end | |
end | |
def new(worker_args, overrides \\ []) | |
def new({worker_module, worker_args}, overrides) do | |
overrides = | |
overrides | |
|> Keyword.put(:worker_module, worker_module) | |
|> Keyword.put(:worker_args, worker_args) | |
new(worker_module, overrides) | |
end | |
def new(worker_module, overrides) do | |
Map.merge( | |
%__MODULE__{worker_module: worker_module, circuit_ref: worker_module}, | |
Enum.into(overrides, %{}) | |
) | |
end | |
def restart(worker) when is_atom(worker), do: restart(new(worker)) | |
def restart(circuit_breaker = %{circuit_ref: circuit_ref}) do | |
case Supervisor.restart_child(circuit_ref, :worker) do | |
{:ok, _pid} -> | |
:ok = set_state(circuit_breaker, :normal) | |
{:ok, _pid, _info} -> | |
:ok = set_state(circuit_breaker, :normal) | |
{:error, error} -> | |
{:error, error} | |
end | |
end | |
def break!(worker) when is_atom(worker), do: break!(new(worker)) | |
def break!(circuit_breaker = %{circuit_ref: circuit_ref}) do | |
:ok = Supervisor.terminate_child(circuit_ref, :worker) | |
:ok = set_state(circuit_breaker, :tripped) | |
end | |
@doc false | |
defdelegate set_state(circuit_breaker, new_state), to: CircuitBreaker.State | |
defdelegate state(circuit_breaker), to: CircuitBreaker.State, as: :get_state | |
def available?(worker) when is_atom(worker), do: available?(new(worker)) | |
def available?(circuit_breaker), do: :normal == state(circuit_breaker) | |
def restarting?(worker) when is_atom(worker), do: restarting?(new(worker)) | |
def restarting?(circuit_breaker), do: :restarting == state(circuit_breaker) | |
def unavailable?(worker) when is_atom(worker), do: unavailable?(new(worker)) | |
def unavailable?(circuit_breaker), do: :tripped == state(circuit_breaker) | |
end | |
defmodule CircuitBreaker.Restarter do | |
use GenServer | |
def start_link(args, opts \\ []), | |
do: GenServer.start_link(__MODULE__, args, opts) | |
@impl true | |
def init(circuit_breaker) do | |
Process.flag(:trap_exit, true) | |
schedule_restart(circuit_breaker) | |
{:ok, circuit_breaker} | |
end | |
@impl true | |
def handle_info(:try_restart, circuit_breaker) do | |
if CircuitBreaker.unavailable?(circuit_breaker) do | |
:ok = CircuitBreaker.set_state(circuit_breaker, :restarting) | |
CircuitBreaker.restart(circuit_breaker) | |
end | |
schedule_restart(circuit_breaker) | |
{:noreply, circuit_breaker} | |
end | |
@impl true | |
def terminate(_reason, circuit_breaker) do | |
:ok = CircuitBreaker.set_state(circuit_breaker, :tripped) | |
{:ok, circuit_breaker} | |
end | |
defp schedule_restart(%{restart_period_ms: restart_period_ms}) do | |
Process.send_after(self(), :try_restart, restart_period_ms) | |
end | |
end | |
defmodule CircuitBreaker.State do | |
@moduledoc false | |
use GenServer | |
def start_link(args, opts \\ []) do | |
opts = Keyword.put_new(opts, :name, __MODULE__) | |
GenServer.start_link(__MODULE__, args, opts) | |
end | |
@impl true | |
def init(args) do | |
ets = args[:ets] || :circuit_breaker_states | |
:ets.new(ets, [:set, :public, :named_table]) | |
{:ok, %{}} | |
end | |
def register(circuit_breaker), do: :ok = set_state(circuit_breaker, :normal) | |
def get_state(_circuit_breaker = %{ets: ets, circuit_ref: circuit_ref}) do | |
case :ets.lookup(ets, circuit_ref) do | |
[{^circuit_ref, value}] -> value | |
_otherwise -> {:error, :not_found} | |
end | |
end | |
def set_state(_circuit_breaker = %{ets: ets, circuit_ref: circuit_ref}, new_state) do | |
true = :ets.insert(ets, {circuit_ref, new_state}) | |
:ok | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment