Created
November 15, 2021 14:16
-
-
Save benwilson512/104fe1d4b35c11a259dd788ad00e011b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: Extract into it's own library | |
defmodule AbsintheSocket do | |
require Logger | |
alias Phoenix.Channels.GenSocketClient | |
@behaviour GenSocketClient | |
@control_topic "__absinthe__:control" | |
defdelegate fetch(term, key), to: Map | |
defdelegate get(term, key, default), to: Map | |
defdelegate get_and_update(term, key, fun), to: Map | |
defdelegate pop(term, key), to: Map | |
defstruct pids: %{}, | |
channel_connected: false, | |
active_subscriptions: %{}, | |
inflight: %{}, | |
pending: [] | |
def run(socket, query, opts) do | |
payload = | |
opts | |
|> Map.new() | |
|> Map.put(:query, query) | |
GenSocketClient.call(socket, {:run, payload}) | |
end | |
def clear_subscriptions(socket) do | |
GenSocketClient.call(socket, :clear_subscriptions) | |
end | |
def start_link(opts) do | |
GenSocketClient.start_link( | |
__MODULE__, | |
GenSocketClient.Transport.WebSocketClient, | |
opts | |
) | |
end | |
def init(opts) do | |
state = %__MODULE__{} | |
{:connect, opts[:url], Enum.to_list(opts[:params]), state} | |
end | |
def handle_call({:run, payload}, {pid, _} = from, transport, state) do | |
state = | |
state | |
|> Map.update!(:pending, &[%{reply_to: from, pid: pid, payload: payload} | &1]) | |
|> push_messages(transport) | |
{:noreply, state} | |
end | |
def handle_call(:clear_subscriptions, {pid, _}, transport, state) do | |
{sub_ids, pids} = Map.pop(state.pids, pid) | |
sub_ids = sub_ids || [] | |
Enum.each(sub_ids, fn sub_id -> | |
Logger.debug(fn -> | |
"Unsubscribing from #{sub_id}" | |
end) | |
GenSocketClient.push(transport, @control_topic, "unsubscribe", %{"subscriptionId" => sub_id}) | |
end) | |
active_subs = Map.drop(state.active_subscriptions, sub_ids) | |
state = %{ | |
state | |
| active_subscriptions: active_subs, | |
pids: pids | |
} | |
{:reply, :ok, state} | |
end | |
def handle_connected(transport, state) do | |
GenSocketClient.join(transport, @control_topic) | |
{:ok, state} | |
end | |
def handle_disconnected(reason, state) do | |
Logger.warn("disconnected: #{inspect(reason)}") | |
Process.send_after(self(), :connect, :timer.seconds(1)) | |
state = | |
state | |
|> Map.put(:channel_connected, false) | |
|> enqueue_subscriptions | |
{:ok, state} | |
end | |
def handle_joined(@control_topic, _payload, transport, state) do | |
state = | |
state | |
|> Map.put(:channel_connected, true) | |
|> push_messages(transport) | |
{:ok, state} | |
end | |
def handle_join_error(@control_topic, :already_joined, _, state) do | |
{:ok, state} | |
end | |
def handle_join_error(topic, payload, _transport, state) do | |
Logger.error("join error on the topic #{topic}: #{inspect(payload)}") | |
{:ok, state} | |
end | |
def handle_channel_closed(topic, payload, _transport, state) do | |
Logger.error("disconnected from the topic #{topic}: #{inspect(payload)}") | |
Process.send_after(self(), {:join, topic}, :timer.seconds(1)) | |
{:ok, state} | |
end | |
def handle_message(topic, "subscription:data", %{"result" => result}, _transport, state) do | |
case Map.fetch(state.active_subscriptions, topic) do | |
{:ok, %{pid: pid}} -> | |
message = %__MODULE__.Message{ | |
id: topic, | |
payload: result, | |
type: :subscription_data | |
} | |
send(pid, message) | |
_ -> | |
Logger.error("Subscription data for unmatched topic #{topic}") | |
end | |
{:ok, state} | |
end | |
def handle_reply( | |
@control_topic, | |
ref, | |
%{"response" => %{"subscriptionId" => sub_id}, "status" => "ok"}, | |
_transport, | |
state | |
) do | |
state = | |
case pop_in(state, [:inflight, ref]) do | |
{%{reply_to: from, pid: pid, payload: payload}, state} -> | |
active_subscriptions = | |
Map.put(state.active_subscriptions, sub_id, %{pid: pid, payload: payload}) | |
pids = Map.update(state.pids, pid, [sub_id], &[sub_id | &1]) | |
if from do | |
GenSocketClient.reply(from, {:ok, sub_id}) | |
end | |
%{ | |
state | |
| active_subscriptions: active_subscriptions, | |
pids: pids | |
} | |
{_, state} -> | |
state | |
end | |
{:ok, state} | |
end | |
def handle_info(:connect, _transport, state) do | |
Logger.info("attempting to reconnect") | |
{:connect, state} | |
end | |
def handle_info({:join, topic}, transport, state) do | |
Logger.debug("joining the topic #{topic}") | |
case GenSocketClient.join(transport, topic) do | |
{:error, :already_joined} -> | |
:ok | |
{:error, reason} -> | |
Logger.error("error joining the topic #{topic}: #{inspect(reason)}") | |
Process.send_after(self(), {:join, topic}, :timer.seconds(1)) | |
{:ok, _ref} -> | |
:ok | |
end | |
{:ok, state} | |
end | |
def handle_info(message, _transport, state) do | |
Logger.warn("#{__MODULE__} Unhandled message #{inspect(message)}") | |
{:ok, state} | |
end | |
def push_messages(%{channel_connected: true} = state, transport) do | |
Logger.debug(fn -> | |
"Pushing #{length(state.pending)} messages" | |
end) | |
inflight = | |
Enum.reduce( | |
state.pending, | |
state.inflight, | |
fn %{payload: payload} = op, inflight -> | |
Logger.debug(fn -> | |
""" | |
#{__MODULE__} socket #{inspect(self())} pushing #{inspect(payload)} | |
""" | |
end) | |
{:ok, ref} = GenSocketClient.push(transport, @control_topic, "doc", payload) | |
Map.put(inflight, ref, op) | |
end | |
) | |
%{state | inflight: inflight, pending: []} | |
end | |
def push_messages(state, _) do | |
state | |
end | |
def enqueue_subscriptions(state) do | |
pending = | |
Enum.reduce( | |
state.active_subscriptions, | |
state.pending, | |
fn {_, %{payload: payload, pid: pid}}, pending -> | |
[ | |
%{ | |
payload: payload, | |
pid: pid, | |
reply_to: nil | |
} | |
| pending | |
] | |
end | |
) | |
%{ | |
state | |
| active_subscriptions: %{}, | |
pending: pending | |
} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment