Created
December 5, 2018 10:33
-
-
Save mpoeter/d18885caa3b46d47f031f6d913c93208 to your computer and use it in GitHub Desktop.
Async WebSocket queries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Absinthe.Phoenix.AsyncChannel do | |
use Phoenix.Channel | |
require Logger | |
@doc false | |
def join("__absinthe__:control", _, socket) do | |
schema = socket.assigns[:__absinthe_schema__] | |
absinthe_config = Map.get(socket.assigns, :absinthe, %{}) | |
opts = | |
absinthe_config | |
|> Map.get(:opts, []) | |
|> Keyword.update(:context, %{pubsub: socket.endpoint}, fn context -> | |
Map.put(context, :pubsub, socket.endpoint) | |
end) | |
absinthe_config = | |
put_in(absinthe_config[:opts], opts) | |
|> Map.update(:schema, schema, &(&1)) | |
|> Map.put(:tasks, %{}) | |
socket = socket |> assign(:absinthe, absinthe_config) | |
{:ok, socket} | |
end | |
@doc false | |
def handle_in("doc", payload, socket) do | |
config = socket.assigns[:absinthe] | |
opts = config.opts |> Keyword.put(:variables, Map.get(payload, "variables", %{})) | |
query = Map.get(payload, "query", "") | |
Absinthe.Logger.log_run(:debug, { | |
query, | |
config.schema, | |
[], | |
opts, | |
}) | |
case Absinthe.Phase.Parse.run(query, []) do | |
{:ok, parsed_query} -> run_query(parsed_query, config.schema, socket, opts) | |
{:error, _} = err -> {:reply, err, socket} | |
end | |
|> case do | |
{:reply, reply, _} = result -> | |
log_reply(reply) | |
result | |
result -> result | |
end | |
end | |
def handle_in("unsubscribe", %{"subscriptionId" => doc_id}, socket) do | |
Phoenix.PubSub.unsubscribe(socket.pubsub_server, doc_id) | |
Absinthe.Subscription.unsubscribe(socket.endpoint, doc_id) | |
{:reply, {:ok, %{subscriptionId: doc_id}}, socket} | |
end | |
def handle_info({:async_finish, result, socket_ref}, socket) do | |
{result, socket} = translate_result(result, socket) | |
log_reply(result) | |
reply(socket_ref, result) | |
{:noreply, socket} | |
end | |
def handle_info({:DOWN, task_ref, _, _, :normal}, socket) do | |
# the task exited normally -> simply unregister it; the reply | |
# is sent as part of the async_finish message handler | |
socket = unregister_task(socket, task_ref) | |
{:noreply, socket} | |
end | |
def handle_info({:DOWN, task_ref, _, _, _reason}, socket) do | |
# the task died for some reason -> send an error reply | |
# TODO - log error | |
socket_ref = Map.fetch!(socket.assigns.absinthe.tasks, task_ref) | |
socket = unregister_task(socket, task_ref) | |
reply(socket_ref, :error) | |
{:noreply, socket} | |
end | |
defp run_query(parsed_query, schema, socket, opts) do | |
pipeline = skip_parse_pipeline(schema, opts) | |
socket_ref = Phoenix.Channel.socket_ref(socket) | |
parent = self() | |
{_pid, task_ref} = spawn_monitor(fn -> | |
result = exec_query(parsed_query, pipeline) | |
send(parent, {:async_finish, result, socket_ref}) | |
end) | |
contains_mutation = Enum.any?(parsed_query.input.definitions, &is_mutation?/1) | |
if contains_mutation do | |
# mutations are executed sequentially, so we wait for the task to finish. | |
wait_for_task(task_ref, socket_ref, socket) | |
else | |
socket = register_task(socket, task_ref, socket_ref) | |
{:noreply, socket} | |
end | |
end | |
defp is_mutation?(%Absinthe.Language.OperationDefinition{operation: op}), do: op == :mutation | |
defp is_mutation?(_), do: false | |
defp wait_for_task(task_ref, socket_ref, socket) do | |
receive do | |
{:DOWN, ^task_ref, _, _, :normal} -> | |
receive do | |
{:async_finish, result, ^socket_ref} -> | |
{result, socket} = translate_result(result, socket) | |
{:reply, result, socket} | |
after | |
0 -> | |
# the task has exited normally but we have not received a finish message | |
# -> this should not be possible! | |
# TODO - log error | |
{:reply, :error, socket} | |
end | |
{:DOWN, ^task_ref, _, _, _reason} -> | |
# the task died for some reason | |
# TODO - log error | |
{:reply, :error, socket} | |
end | |
end | |
defp translate_result(result, socket) do | |
case result do | |
{:ok, %{"subscribed" => topic}, context} -> | |
:ok = Phoenix.PubSub.subscribe(socket.pubsub_server, topic, [ | |
fastlane: {socket.transport_pid, socket.serializer, []}, | |
link: true, | |
]) | |
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) | |
{{:ok, %{subscriptionId: topic}}, socket} | |
{:ok, %{data: _} = reply, context} -> | |
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) | |
{{:ok, reply}, socket} | |
{:ok, %{errors: _} = reply, context} -> | |
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context) | |
{{:error, reply}, socket} | |
{:error, reply} -> | |
{reply, socket} | |
end | |
end | |
defp exec_query(doc, pipeline) do | |
case Absinthe.Pipeline.run(doc, pipeline) do | |
{:ok, %{result: result, execution: res}, _phases} -> | |
{:ok, result, res.context} | |
{:error, msg, _phases} -> | |
{:error, msg} | |
end | |
end | |
defp log_reply(reply) do | |
Logger.debug(fn -> | |
""" | |
-- Absinthe Phoenix Reply -- | |
#{inspect reply} | |
---------------------------- | |
""" | |
end) | |
end | |
defp skip_parse_pipeline(schema, options) do | |
[{Absinthe.Phase.Parse, _} | rest] = Absinthe.Pipeline.for_document(schema, options) | |
rest | |
end | |
defp register_task(socket, task_ref, socket_ref) do | |
absinthe_assigns = | |
socket.assigns | |
|> Map.get(:absinthe, %{}) | |
|> Map.update!(:tasks, &Map.put(&1, task_ref, socket_ref)) | |
Phoenix.Socket.assign(socket, :absinthe, absinthe_assigns) | |
end | |
defp unregister_task(socket, task_ref) do | |
absinthe_assigns = | |
socket.assigns | |
|> Map.get(:absinthe, %{}) | |
|> Map.update!(:tasks, &Map.delete(&1, task_ref)) | |
Phoenix.Socket.assign(socket, :absinthe, absinthe_assigns) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment