Last active
October 1, 2020 09:11
-
-
Save hubertlepicki/8cc78b8167ef738eed9fc10b8ae1dcf3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule UI.Application do | |
use Application | |
def start(_type, _args) do | |
# List of children to be started when checks succeed | |
delayed_children = [ | |
UI.Endpoint, | |
{Absinthe.Subscription, UI.Endpoint} | |
] | |
# List all child processes to be supervised | |
children = [ | |
{Infra.DelayedStartSupervisor, name: UI.DelayedStartSupervisor}, | |
{Infra.DelayedStartWatchdog, | |
children: delayed_children, | |
for_name: UI.DelayedStartSupervisor, | |
checks: DB.ReadyChecks.all()} | |
] | |
opts = [strategy: :one_for_one, name: UI.Supervisor] | |
Supervisor.start_link(children, opts) | |
end | |
# Tell Phoenix to update the endpoint configuration | |
# whenever the application is updated. | |
def config_change(changed, _new, removed) do | |
UI.Endpoint.config_change(changed, removed) | |
:ok | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Infra.DelayedStartSupervisor do | |
use DynamicSupervisor | |
require Logger | |
def start_link(opts) do | |
DynamicSupervisor.start_link(__MODULE__, :ignore, name: opts[:name]) | |
end | |
def init(_ignore) do | |
DynamicSupervisor.init(strategy: :one_for_one) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Infra.DelayedStartWatchdog do | |
use GenServer | |
require Logger | |
@moduledoc """ | |
Use with conjunction with `Infra.DelayedStartSupervisor`. | |
In your application callback module start named `Infra.DelayedStartSupervisor` and then | |
after it, start this module and pass it `for_name` matching the supervisor name, | |
list of checks (list of functions) and list of children to be started. | |
This watchdog will start polling every 1s to see if checks are passing, and will only start | |
children in supervisor once they do. At this moment the work of this GenServer is done and | |
it stops doing any more work. | |
delayed_children = [UI.Endpoint] | |
children = [ | |
{Infra.DelayedStartSupervisor, name: UI.DelayedStartSupervisor}, | |
{Infra.DelayedStartWatchdog, | |
children: delayed_children, | |
for_name: UI.DelayedStartSupervisor, | |
checks: DB.ReadyChecks.all()} | |
] | |
opts = [strategy: :one_for_one, name: UI.Supervisor] | |
Supervisor.start_link(children, opts) | |
""" | |
def start_link(opts) do | |
GenServer.start_link( | |
__MODULE__, | |
%{checks: opts[:checks], children: opts[:children], for_name: opts[:for_name]}, | |
name: opts[:name] | |
) | |
end | |
@impl true | |
def init(state) do | |
schedule_poll(100) | |
{:ok, state} | |
end | |
@impl true | |
def handle_info(:check_ready, state) do | |
case Enum.all?(state.checks, & &1.()) do | |
true -> | |
Logger.info( | |
"#{__MODULE__} starting children for #{state.for_name} as all checks have passed" | |
) | |
state.children | |
|> Enum.each(fn child -> | |
{:ok, _pid} = DynamicSupervisor.start_child(state.for_name, child) | |
end) | |
{:noreply, state} | |
_ -> | |
Logger.warn( | |
"#{__MODULE__} delaying start of children of #{state.for_name} as some checks have failed" | |
) | |
schedule_poll() | |
{:noreply, state} | |
end | |
end | |
def handle_info(msg, state) do | |
Logger.warn("[#{__MODULE__} received unexpected message that was ignored: #{inspect(msg)}") | |
{:noreply, state} | |
end | |
# check every second for readiness | |
@poll_interval 1000 | |
defp schedule_poll(delay \\ @poll_interval) do | |
Process.send_after(self(), :check_ready, delay) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule DB.ReadyChecks do | |
require Logger | |
def all do | |
[ | |
&check_primary/0, | |
&check_replica/0 | |
] | |
end | |
defp check_primary do | |
concurrently(10, fn -> | |
try do | |
{:ok, _} = Ecto.Adapters.SQL.query(DB.Repo, "SELECT pg_sleep(0.1)") | |
true | |
rescue | |
e -> | |
Logger.error("#{__MODULE__}.check_primary check failed because of:") | |
Logger.error("#{inspect(e)}") | |
false | |
end | |
end) | |
end | |
defp check_replica do | |
concurrently(10, fn -> | |
try do | |
{:ok, _} = Ecto.Adapters.SQL.query(DB.Repo.replica(), "SELECT pg_sleep(0.1)") | |
true | |
rescue | |
e -> | |
Logger.error("#{__MODULE__}.check_replica check failed because of:") | |
Logger.error("#{inspect(e)}") | |
false | |
end | |
end) | |
end | |
defp concurrently(processes_count, func) do | |
0..processes_count | |
|> Enum.map(fn _ -> Task.async(fn -> func.() end) end) | |
|> Enum.map(&Task.await/1) | |
|> Enum.all?(& &1) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment