Created
March 1, 2018 23:07
-
-
Save alvesl/05959b76e9ac3372694563bf6e1918a3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.Manager do | |
use GenServer | |
use Timex | |
import Ecto.Query, warn: false | |
@cache_pending_groups_expiry 60 * 5 | |
def start_link() do | |
Singleton.start_child(__MODULE__, %{supervised_workers: 0, groups: nil}, {MyApp.Manager, 1}) | |
end | |
def init(state) do | |
groups = load_groups() | |
new_state = Map.put(state, :groups, groups) | |
schedule_workers(groups) | |
schedule_reporter() # Schedule work to be performed on start | |
{:ok, new_state} | |
end | |
# CLIENT API | |
def dismiss_worker(group_id) do | |
pid = :global.whereis_name({MyApp.Manager, 1}) | |
GenServer.cast(pid, {:remove_worker, group_id}) | |
end | |
def current_workers() do | |
pid = :global.whereis_name({MyApp.Manager, 1}) | |
GenServer.call(pid, {:current_workers}) | |
end | |
def force_report() do | |
pid = :global.whereis_name({MyApp.Manager, 1}) | |
groups = load_groups() | |
GenServer.call(pid, {:force_report, groups}) | |
end | |
def new_worker(group) do | |
pid = :global.whereis_name({MyApp.Manager, 1}) | |
GenServer.call(pid, {:start_worker, group}) | |
end | |
# SERVER API | |
def handle_call({:start_worker, group}, _from, state) do | |
start_worker(group) | |
new_state = Map.put(state, :groups, state.groups ++ [group]) | |
{:reply, {:ok, group}, new_state} | |
end | |
def handle_call({:current_workers}, _from, state) do | |
{:reply, length(state.groups), state} | |
end | |
def handle_call({:force_report, groups}, _from, state) do | |
new_state = report(groups, state) | |
{:reply, :ok, new_state} | |
end | |
def handle_cast({:remove_worker, group_id}, state) do | |
new_state = Map.put(state, :groups, Enum.reject(state.groups, &(&1.id == group_id))) | |
{:noreply, new_state} | |
end | |
def handle_info(:report, state) do | |
new_state = report(load_groups(), state) | |
schedule_reporter() # Reschedule once more | |
{:noreply, new_state} | |
end | |
defp schedule_reporter() do | |
Process.send_after(self(), :report, 10 * 1000 * 60) # Every 10 min | |
end | |
defp schedule_workers(groups) do | |
Enum.each(groups, fn(group) -> | |
start_worker(group) | |
end) | |
end | |
# AUX FUNCTIONS (Some are omitted) | |
defp lookup(group_id) do | |
case Registry.lookup(Registry.MyApp, "group-#{group_id}") do | |
[{pid, _}] -> pid | |
_ -> nil | |
end | |
end | |
def start_worker(group) do | |
pid_name = {:via, Registry, {Registry.MyApp, "group-#{group.id}"}} | |
spawn(fn() -> MyApp.Worker.start_link(pid_name, group) end) | |
end | |
defp report(groups, state) do | |
# Does a bunch of stuff and then calls terminate_unhealthy | |
end | |
defp terminate_unhealthy(groups) do | |
# Terminate no longer valid groups | |
Enum.each(groups, fn(group) -> | |
pid = lookup(group.id) | |
if terminate?(pid) do | |
Process.exit(pid, :kill) | |
end | |
end) | |
end | |
defp schedule_groups(groups) when not is_nil(groups) do | |
Enum.each(groups, fn(group) -> | |
# Check if process is alive | |
pid = lookup(group.id) | |
if schedule?(pid) do | |
start_worker(group) | |
end | |
end) | |
end | |
defp schedule?(pid) when is_nil(pid), do: true | |
defp schedule?(pid), do: !Process.alive?(pid) | |
defp terminate?(pid), do: !schedule?(pid) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment