A simple PubSub adapter that sends messages between nodes over AMQP.
The app supervisor needs to be configured with:
{Phoenix.PubSub, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: amqp_connection_id]},| defmodule Phoenix.PubSub.AMQP do | |
| use Supervisor | |
| require Logger | |
| @behaviour Phoenix.PubSub.Adapter | |
| def start_link(opts) do | |
| Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | |
| end | |
| @impl Supervisor | |
| def init(opts) do | |
| # [adapter_name: App.PubSub.Adapter, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: ...]] | |
| children = [ | |
| {__MODULE__.Gateway, opts} | |
| ] | |
| Supervisor.init(children, strategy: :one_for_one) | |
| end | |
| @impl Phoenix.PubSub.Adapter | |
| defdelegate broadcast(adapter_name, topic, message, dispatcher), to: __MODULE__.Gateway | |
| @impl Phoenix.PubSub.Adapter | |
| defdelegate direct_broadcast(adapter_name, node_name, topic, message, dispatcher), | |
| to: __MODULE__.Gateway | |
| @impl true | |
| def node_name(_adapter_name), do: node() | |
| defmodule Gateway do | |
| use GenServer | |
| require Logger | |
| defstruct [:connection, :ch, :name, :ident, :node_name] | |
| def start_link(opts) do | |
| GenServer.start_link(__MODULE__, opts, name: opts[:adapter_name]) | |
| end | |
| def broadcast(adapter_name, topic, message, dispatcher) do | |
| GenServer.call(adapter_name, {:broadcast, topic, message, dispatcher}) | |
| end | |
| def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do | |
| if node_name == node() do | |
| Phoenix.PubSub.local_broadcast(adapter_name, topic, message, dispatcher) | |
| else | |
| GenServer.call(adapter_name, {:direct_broadcast, node_name, topic, message, dispatcher}) | |
| end | |
| end | |
| def init(opts) do | |
| state = %__MODULE__{ | |
| connection: opts[:amqp][:connection], | |
| name: opts[:name], | |
| ident: Atom.to_string(opts[:name]), | |
| node_name: Atom.to_string(node()) | |
| } | |
| {:ok, state, {:continue, :connect}} | |
| end | |
| def handle_continue(:connect, state) do | |
| with {:ok, conn} <- AMQP.Application.get_connection(state.connection), | |
| {:ok, ch} <- AMQP.Channel.open(conn) do | |
| Process.monitor(ch.pid) | |
| setup_channel_disposal(ch) | |
| :ok = AMQP.Basic.qos(ch, prefetch_count: 25) | |
| {:noreply, %__MODULE__{state | ch: ch}, {:continue, :setup_topology}} | |
| else | |
| _ -> | |
| Process.send_after(self(), :try_connect, 100) | |
| {:noreply, state} | |
| end | |
| end | |
| def handle_continue(:setup_topology, state) do | |
| :ok = AMQP.Exchange.fanout(state.ch, exchange(state), durable: true) | |
| {:ok, %{queue: queue}} = AMQP.Queue.declare(state.ch, queue(state), durable: true) | |
| :ok = AMQP.Queue.bind(state.ch, queue, exchange(state)) | |
| {:ok, _tag} = AMQP.Basic.consume(state.ch, queue, nil) | |
| {:noreply, state} | |
| end | |
| def handle_call( | |
| {:broadcast, _topic, _message, _dispatcher}, | |
| _from, | |
| state = %__MODULE__{ch: nil} | |
| ) do | |
| {:reply, {:error, :not_connected}, state} | |
| end | |
| def handle_call({:broadcast, topic, message, _dispatcher}, _from, state) do | |
| :ok = | |
| AMQP.Basic.publish(state.ch, exchange(state), "", :erlang.term_to_binary(message), | |
| headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}] | |
| ) | |
| {:reply, :ok, state} | |
| end | |
| def handle_call( | |
| {:direct_broadcast, _node_name, _topic, _message, _dispatcher}, | |
| _from, | |
| state = %__MODULE__{ch: nil} | |
| ) do | |
| {:reply, {:error, :not_connected}, state} | |
| end | |
| def handle_call({:direct_broadcast, node_name, topic, message, _dispatcher}, _from, state) do | |
| :ok = | |
| AMQP.Basic.publish(state.ch, "", queue(state, node_name), :erlang.term_to_binary(message), | |
| headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}] | |
| ) | |
| {:reply, :ok, state} | |
| end | |
| def handle_info(:try_connect, state) do | |
| {:noreply, state, {:continue, :connect}} | |
| end | |
| def handle_info({:basic_consume_ok, %{consumer_tag: _}}, state) do | |
| {:noreply, state} | |
| end | |
| def handle_info( | |
| {:basic_deliver, payload, %{delivery_tag: dtag, headers: headers}}, | |
| state | |
| ) do | |
| payload = :erlang.binary_to_term(payload) | |
| topic = headers |> header("topic") | |
| sender = headers |> header("sender") | |
| if sender != Atom.to_string(node()) do | |
| Phoenix.PubSub.local_broadcast(state.name, topic, payload) | |
| end | |
| AMQP.Basic.ack(state.ch, dtag) | |
| {:noreply, state} | |
| end | |
| def handle_info( | |
| {:DOWN, _ref, :process, pid, _reason}, | |
| state = %__MODULE__{ch: %AMQP.Channel{pid: pid}} | |
| ) do | |
| {:noreply, %__MODULE__{state | ch: nil}, {:continue, :connect}} | |
| end | |
| def handle_info(msg, state) do | |
| Logger.error( | |
| "#{__MODULE__} #{Process.info(self())[:registered_name]} received unexpected message in handle_info/2: #{inspect(msg)}" | |
| ) | |
| {:noreply, state} | |
| end | |
| def terminate(reason, state) do | |
| Logger.info("exiting: #{inspect(reason)}") | |
| {:noreply, state} | |
| end | |
| defp exchange(%__MODULE__{ident: ident}), do: ident | |
| defp queue(state), do: queue(state, node()) | |
| defp queue(%__MODULE__{ident: ident}, n), do: "#{ident}.#{n}" | |
| defp header(headers, name) do | |
| case List.keyfind(headers, name, 0) do | |
| {^name, _, value} -> value | |
| nil -> nil | |
| end | |
| end | |
| defp setup_channel_disposal(ch = %AMQP.Channel{pid: ch_pid}) do | |
| gateway = self() | |
| monitoring = make_ref() | |
| spawn(fn -> | |
| Process.monitor(gateway) | |
| Process.monitor(ch_pid) | |
| send(gateway, monitoring) | |
| receive do | |
| {:DOWN, _, _, ^gateway, _} -> | |
| AMQP.Channel.close(ch) | |
| nil | |
| {:DOWN, _, _, ^ch_pid, _} -> | |
| nil | |
| end | |
| end) | |
| receive do | |
| ^monitoring -> :ok | |
| end | |
| end | |
| end | |
| end |