-
-
Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
# ATTENTION: This is now supported in plug_cowboy as of 2.1.0: | |
# https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html | |
defmodule DrainStop do | |
@moduledoc """ | |
DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown | |
occurs. It first shuts down the acceptor, ensuring that no new requests can be | |
made. It then waits for all pending requests to complete. If the timeout | |
expires before this happens, it stops waiting, allowing the supervision tree | |
to continue its shutdown order. | |
DrainStop should be installed in your supervision tree *after* the | |
EndPoint it is going to drain stop. | |
DrainStop takes two options: | |
`endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The | |
amount of time to allow for requests to finish in msec. Defaults to `5000`. | |
For example: | |
children = [ | |
supervisor(MyApp.Endpoint, []), | |
worker( | |
DrainStop, | |
[[timeout: 10_000, endpoint: MyApp.Endpoint]], | |
[shutdown: 15_000] | |
) | |
] | |
""" | |
use GenServer | |
require Logger | |
import Supervisor, only: [which_children: 1, terminate_child: 2] | |
def start_link(options) do | |
GenServer.start_link(DrainStop, options) | |
end | |
def init(options) do | |
Process.flag(:trap_exit, true) | |
endpoint = Keyword.fetch!(options, :endpoint) | |
timeout = Keyword.get(options, :timeout, 5000) | |
{:ok, {endpoint, timeout}} | |
end | |
def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout) | |
def terminate(_, _), do: :ok | |
def drain_endpoint(endpoint, timeout) do | |
stop_listening(endpoint) | |
wait_for_requests(endpoint, timeout) | |
end | |
def wait_for_requests(endpoint, timeout) do | |
Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}") | |
timer_ref = :erlang.start_timer(timeout, self, :timeout) | |
do_wait_for_requests(endpoint, timer_ref, %{}) | |
end | |
defp do_wait_for_requests(endpoint, timer_ref, refs) do | |
get_monitor = fn pid -> | |
refs[pid] || Process.monitor(pid) | |
end | |
refs = | |
endpoint | |
|> pending_requests | |
|> Map.new(&{&1, get_monitor.(&1)}) | |
case Map.size(refs) do | |
0 -> | |
Logger.info("DrainStop Successful, no more connections") | |
:erlang.cancel_timer(timer_ref) | |
n -> | |
time_left = :erlang.read_timer(timer_ref) | |
Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown") | |
receive do | |
{:DOWN, _monitor_ref, _, _, _} -> | |
do_wait_for_requests(endpoint, timer_ref, refs) | |
{:timeout, ^timer_ref, :timeout} -> | |
Logger.error("DrainStop timeout") | |
msg -> | |
Logger.error("DrainStop unexpected msg: #{inspect msg}") | |
do_wait_for_requests(endpoint, timer_ref, refs) | |
end | |
end | |
end | |
def pending_requests(endpoint) do | |
for pid <- ranch_listener_sup_pids(endpoint), | |
{:ranch_conns_sup, sup_pid, _, _} <- which_children(pid), | |
{_, request_pid, _, _} <- which_children(sup_pid), | |
do: request_pid | |
end | |
def ranch_listener_sup_pids(endpoint) do | |
for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint), | |
{{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid), | |
do: pid | |
end | |
def stop_listening(endpoint) do | |
endpoint | |
|> ranch_listener_sup_pids | |
|> Enum.each(&terminate_child(&1, :ranch_acceptors_sup)) | |
end | |
end |
defmodule DrainStopTest do | |
use ExUnit.Case, async: false | |
use Phoenix.ConnTest | |
defmodule TestSupervisor do | |
@drain_stop_timeout 100 | |
def start_link do | |
import Supervisor.Spec, warn: false | |
children = [ | |
# Start the endpoint when the application starts | |
supervisor(DrainStopTest.TestEndpoint, []), | |
worker( | |
DrainStop, | |
[[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]], | |
[shutdown: @drain_stop_timeout * 2] | |
), | |
] | |
opts = [strategy: :one_for_one] | |
Supervisor.start_link(children, opts) | |
end | |
end | |
defmodule TestPlug do | |
def init(opts), do: opts | |
def call(conn, _) do | |
pid = Application.get_env(:drain_stop_test, :test_pid) | |
conn = Plug.Conn.fetch_query_params(conn) | |
send pid, :request_start | |
{time, _} = Integer.parse(conn.params["sleep"]) | |
:timer.sleep(time) | |
send pid, :request_end | |
conn | |
end | |
end | |
defmodule TestEndpoint do | |
use Phoenix.Endpoint, otp_app: :drain_stop_test | |
plug DrainStopTest.TestPlug | |
end | |
@endpoint DrainStopTest.TestEndpoint | |
setup do | |
Application.put_env(:drain_stop_test, :test_pid, self) | |
Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint, | |
http: [port: "4807"], url: [host: "example.com"], server: true) | |
{:ok, pid} = DrainStopTest.TestSupervisor.start_link | |
Process.flag(:trap_exit, true) | |
on_exit(fn -> | |
if Process.whereis(DrainStopTest.TestEndpoint) do | |
Supervisor.stop(DrainStopTest.TestEndpoint) | |
end | |
Process.exit(pid, :brutal_kill) | |
end) | |
{:ok, pid: pid} | |
end | |
test "waits for request to finish", %{pid: pid} do | |
Task.async(fn -> | |
HTTPoison.get("http://localhost:4807/?sleep=50") | |
end) | |
assert_receive :request_start, 1000 | |
Supervisor.stop(pid, :shutdown) | |
assert_received :request_end | |
end | |
test "truncates requests that don't finish in time", %{pid: pid} do | |
Task.async(fn -> | |
HTTPoison.get("http://localhost:4807/?sleep=500") | |
end) | |
assert_receive :request_start, 1000 | |
Supervisor.stop(pid, :shutdown) | |
refute_received :request_end | |
end | |
test "does not allow new requests" do | |
# This is harder to test without reaching in to internals... | |
DrainStop.stop_listening(DrainStopTest.TestEndpoint) | |
assert_raise(HTTPoison.Error, | |
fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end) | |
end | |
end |
Thanks for posting this 👍 very helpful for a use case that I have.
Thanks a lot for the job! But how can I use it. I added DrainStop
to children following the doc.
And run
iex -S mix phoenix.server
Erlang/OTP 19 [erts-8.0.2] [source-9503fff] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]
[info] Running MyPro.Endpoint with Cowboy using http://localhost:4000
Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Application.stop(:my_pro)
[info] stop listening
[info] DrainStop starting graceful shutdown with timeout: 10000
[info] DrainStop Successful, no more connections
:ok
[info] Application star_way_bff exited: :stopped
Perfect! it works! But how can I do it for production? Phoenix server will be started as daemon , how can I stop that application?
Is there a mechanism in Elixir can handle Unix killing signals?
You guys have a better idea?
@xingxing not sure if you're still looking for a solution. But I found https://github.com/tsutsu/signal_handler which lets Elixir applications intercept POSIX signals.
@aaronjensen are you planning to package this functionality into a library?
Anybody know if this works with Elixir 1.4.5 and Phoenix 1.3? It doesn't seem to work for me. I put a :timer.sleep/1
in a controller and when I call :init.stop()
the connection is severed.
Doesn't seem to work; Elixir 1.5.2, Phoenix 1.3. It still kills open connections that are being blocked by :timer.sleep/1
, as reported above. Is this just an artifact of using sleep? Will it work in other scenarios? How can I trust it?
For people trying to use it with Phoenix 1.3, maybe take a look at https://github.com/lyokato/the_end
This is already supported by plug_cowboy
(release 2.1.0)
https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html
That's great! I'll update the file at the top to say that this is now supported as of plug_cowboy
2.1.0. Thanks for pointing it out.
Thanks for this. I think it'll be a big help for us!