Created
May 2, 2016 22:49
-
-
Save rranelli/1cc88f0d74e512178396d3755f3808a0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule LuklaWeb.AsyncQueue do | |
use GenServer | |
alias LuklaWeb.Repo | |
alias LuklaWeb.AsyncQueue.Job | |
@reconnect_interval_millis 5000 | |
def start_link() do | |
GenServer.start_link(__MODULE__, [], name: AsyncQueue) | |
end | |
@failed_state %{pid: nil, alive: false} | |
@doc """ | |
We defer the connection because we don't want to block the application start | |
if Redis is down. | |
Also, a `:reconnect` message is sent to the GenServer every | |
`@reconnect_interval_millis` milliseconds. | |
""" | |
def init(_args \\ []) do | |
:erlang.process_flag(:trap_exit, true) | |
send(self, :reconnect) | |
:timer.send_interval(@reconnect_interval_millis, self, :reconnect) | |
{:ok, @failed_state} | |
end | |
@doc """ | |
In the rare case a enqueue is called with alive Exq and Redis connection is | |
lost in-between, we catch the exit signal and update with a failed state. | |
""" | |
def handle_call({:enqueue, _, _}, _from, s = %{alive: false}) do | |
{:reply, {:error, :econnrefused}, s} | |
end | |
def handle_call({:enqueue, worker_module, job_id}, _from, s = %{alive: true}) do | |
try do | |
case Exq.enqueue(Exq, "default", worker_module, [job_id]) do | |
{:ok, result} -> {:reply, {:ok, result}, s} | |
{:error, reason} -> {:reply, {:error, reason}, s} | |
otherwise -> {:reply, {:error, otherwise}, s} | |
end | |
catch | |
:exit, {:noproc, _} -> {:reply, {:error, :econnrefused}, @failed_state} | |
end | |
end | |
@doc """ | |
We trap Exq exit and issue a reconnect. | |
On reconnect, we try to start Exq for... EVER | |
""" | |
def handle_info(:reconnect, s = %{alive: true}), do: {:noreply, s} | |
def handle_info(:reconnect, %{alive: false}) do | |
try do | |
{:ok, pid} = Exq.start_link | |
{:noreply, %{alive: true, pid: pid}} | |
rescue _e -> | |
{:noreply, @failed_state} | |
end | |
end | |
def handle_info({:EXIT, pid, {:shutdown, _}}, %{pid: pid}), do: {:noreply, @failed_state} | |
def handle_info({:EXIT, pid, :shutdown}, %{pid: pid}), do: {:noreply, @failed_state} | |
def handle_info(_msg, s), do: {:noreply, s} | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment