|
defmodule CircuitBreaker.Monitor do |
|
@moduledoc """ |
|
Handles monitoring and starting/stopping processes supervised |
|
under our circuit breaker. |
|
""" |
|
use GenServer |
|
|
|
require Logger |
|
|
|
alias FeatureFlags |
|
|
|
defstruct children: [], ref_to_id: %{}, id_to_ref: %{}, crashes: %{} |
|
|
|
|
|
@dynamic_supervisor CircuitBreaker.DynamicSupervisor |
|
|
|
@poll_interval 1_000 |
|
|
|
def start_link(init_arg) do |
|
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__) |
|
end |
|
|
|
@impl true |
|
def init(init_arg) do |
|
children = Keyword.fetch!(init_arg, :children) |
|
|
|
{:ok, %__MODULE__{children: children}, {:continue, :check_children}} |
|
end |
|
|
|
@impl true |
|
def handle_continue(:check_children, state) do |
|
handle_info(:check_children, state) |
|
end |
|
|
|
@impl true |
|
def handle_info(:check_children, %__MODULE__{children: children} = state) do |
|
schedule_next_loop(@poll_interval) |
|
|
|
state = Enum.reduce(children, state, &check_spec_state/2) |
|
|
|
{:noreply, state} |
|
end |
|
|
|
@impl true |
|
def handle_info({:DOWN, ref, :process, _pid, reason}, %__MODULE__{ref_to_id: ref_to_id} = state) do |
|
id = Map.get(ref_to_id, ref) |
|
|
|
state = |
|
state |
|
|> clear_monitor(id) |
|
|> clear_crash(id) |
|
|
|
state = |
|
case reason do |
|
:normal -> |
|
state |
|
|
|
crash -> |
|
Logger.error( |
|
"Process id #{id} backed by circuit breaker has crashed: #{inspect(crash)}", |
|
context: __MODULE__ |
|
) |
|
|
|
put_crash(state, id) |
|
end |
|
|
|
{:noreply, state} |
|
end |
|
|
|
defp schedule_next_loop(interval) do |
|
if @enabled? do |
|
Process.send_after(self(), :check_children, interval) |
|
else |
|
:ok |
|
end |
|
end |
|
|
|
defp check_spec_state(%{ld_key: ld_key} = spec, state) do |
|
flag_on? = FeatureFlags.is_on?(ld_key, default: true) |
|
has_crashed? = has_crashed?(state, spec) |
|
is_running? = is_running?(spec) |
|
|
|
handle_states(flag_on?, has_crashed?, is_running?, state, spec) |
|
end |
|
|
|
defp handle_states(flag_on?, has_crashed?, is_running?, state, spec) |
|
# If it's supposed to be on, hasn't crashed, and isn't running -> start it |
|
defp handle_states(true, false, false, state, spec) do |
|
Logger.info("Circuit breaker flag activated, starting up child with spec #{inspect(spec)}", |
|
context: __MODULE__ |
|
) |
|
|
|
start_child(state, spec) |
|
end |
|
|
|
# If it's supposed to be on, _has_ crashed, but is still runnning -> clear the crash and re-monitor it |
|
# Reasoning: This should never happen, but if it does, tolerate it. |
|
defp handle_states(true, true, true, state, spec) do |
|
pid = pid_for_spec(spec) |
|
|
|
if is_nil(pid) do |
|
clear_crash(state, spec) |
|
else |
|
clear_crash(state, spec) |> monitor_pid(spec, pid) |
|
end |
|
end |
|
|
|
# If it's supposed to be stopped but hasn't yet, stop it. |
|
# Ignore whether or not it crashed (as we should clear crash state in stop_child) |
|
defp handle_states(false, _, true, state, spec) do |
|
stop_child(state, spec) |
|
end |
|
|
|
# If it's supposed to be stopped (and is stopped), but it's crashed, clear the crash state |
|
# This state shouldn't be reachable, but if it is, this is the right move. |
|
# No need to log this state as it doesn't mean anything at runtime |
|
defp handle_states(false, true, false, state, spec), do: clear_crash(state, spec) |
|
|
|
# Every other state should be a "steady state" with zero change. |
|
defp handle_states(_, _, _, state, _), do: state |
|
|
|
defp has_crashed?(%__MODULE__{crashes: crashes}, spec) do |
|
id = spec_to_id(spec) |
|
|
|
Map.get(crashes, id) != nil |
|
end |
|
|
|
defp is_running?(spec) do |
|
case pid_for_spec(spec) do |
|
pid when is_pid(pid) -> Process.alive?(pid) |
|
_ -> false |
|
end |
|
end |
|
|
|
defp start_child(state, spec) do |
|
state = |
|
state |
|
|> clear_crash(spec) |
|
|> clear_monitor(spec) |
|
|
|
case DynamicSupervisor.start_child(@dynamic_supervisor, spec) do |
|
{:ok, pid} -> |
|
monitor_pid(state, spec, pid) |
|
|
|
{:ok, pid, _info} -> |
|
monitor_pid(state, spec, pid) |
|
|
|
{:error, {:already_started, pid}} -> |
|
monitor_pid(state, spec, pid) |
|
|
|
# Any other failure and we don't start the child |
|
{:error, reason} -> |
|
Logger.warn( |
|
"Unable to start a child under the circuit breaker: #{inspect(reason)} with spec: #{inspect(spec)}", |
|
context: __MODULE__ |
|
) |
|
|
|
state |
|
|
|
:ignore -> |
|
Logger.warn( |
|
"Unable to start a child under the circuit breaker: child process start function returned :ignore with spec: #{inspect(spec)}", |
|
context: __MODULE__ |
|
) |
|
|
|
state |
|
end |
|
end |
|
|
|
defp stop_child(state, spec) do |
|
case pid_for_spec(spec) do |
|
pid when is_pid(pid) -> |
|
DynamicSupervisor.terminate_child(@dynamic_supervisor, pid) |
|
|
|
state |
|
|> clear_crash(spec) |
|
|> clear_monitor(spec) |
|
|
|
nil -> |
|
state |
|
end |
|
end |
|
|
|
defp monitor_pid(%__MODULE__{ref_to_id: ref_to_id, id_to_ref: id_to_ref} = state, spec, pid) do |
|
id = spec_to_id(spec) |
|
ref = Process.monitor(pid) |
|
ref_to_id = Map.put(ref_to_id, ref, id) |
|
id_to_ref = Map.put(id_to_ref, id, ref) |
|
|
|
%__MODULE__{state | ref_to_id: ref_to_id, id_to_ref: id_to_ref} |
|
end |
|
|
|
defp put_crash(%__MODULE__{crashes: crashes} = state, spec) do |
|
id = spec_to_id(spec) |
|
crashes = Map.put(crashes, id, true) |
|
clear_monitor(%__MODULE__{state | crashes: crashes}, spec) |
|
end |
|
|
|
defp clear_crash(%__MODULE__{crashes: crashes} = state, spec) do |
|
id = spec_to_id(spec) |
|
crashes = Map.drop(crashes, [id]) |
|
|
|
%{state | crashes: crashes} |
|
end |
|
|
|
defp clear_monitor(%__MODULE__{ref_to_id: ref_to_id, id_to_ref: id_to_ref} = state, spec) do |
|
id = spec_to_id(spec) |
|
|
|
ref_to_id = |
|
case Map.get(id_to_ref, id) do |
|
nil -> |
|
ref_to_id |
|
|
|
ref -> |
|
Process.demonitor(ref) |
|
Map.drop(ref_to_id, [ref]) |
|
end |
|
|
|
id_to_ref = Map.drop(id_to_ref, [id]) |
|
|
|
%__MODULE__{state | id_to_ref: id_to_ref, ref_to_id: ref_to_id} |
|
end |
|
|
|
defp pid_for_spec(spec) do |
|
maybe_child = |
|
DynamicSupervisor.which_children(@dynamic_supervisor) |
|
# Always return the last created process, helpful for race conditions |
|
|> Enum.reverse() |
|
|> Enum.find(fn |
|
{_, _, _, [maybe_id]} -> maybe_id == spec_to_id(spec) |
|
end) |
|
|
|
case maybe_child do |
|
{_, pid, _, _} -> |
|
pid |
|
|
|
_ -> |
|
nil |
|
end |
|
end |
|
|
|
defp spec_to_id(%{start: {id, _, _}}), do: id |
|
defp spec_to_id(%{id: id}), do: id |
|
defp spec_to_id(id) when is_atom(id), do: id |
|
end |