It's possible to do GraphQl Subscriptions over basic HTTP 1.1 with Server Sent Events!!
Start the needed Supervisors...
supervisor(Streamer.Supervisor, [[pubsub: MyPubSub]])
Mount the Plug...
match "/stream/graphql", to: Streamer, init_opts: [schema: MySchema, pubsub: MyPubSub]
The plug code would be the thing that needs to get integrated into Absinthe Plug.
Rough Implementation...
defmodule Streamer.Supervisor do
use Supervisor
def start_link(pubsub: pubsub) do
Supervisor.start_link(__MODULE__, [pubsub: pubsub])
end
def init(pubsub: pubsub) do
children = [
worker(pubsub, []),
supervisor(Absinthe.Subscription, [pubsub])
]
supervise(children, strategy: :one_for_one)
end
end
defmodule Streamer do
@behaviour Plug
def init(opts),
do: opts
def call(conn, [schema: schema, pubsub: pubsub]) do
# Just want another process to handle the subscription, not necessarily the perfect implementation
Process.flag(:trap_exit, true)
{:ok, runner} = Streamer.Runner.start_link(schema, pubsub, conn)
receive do
{:EXIT, ^runner, _reason} ->
Plug.Conn.chunk(conn, "BYE\n\n")
end
conn
end
end
defmodule Streamer.Runner do
use GenServer
def start_link(schema, pubsub, conn) do
GenServer.start_link(__MODULE__, [schema: schema, pubsub: pubsub, conn: conn], name: __MODULE__)
end
def init(schema: schema, pubsub: pubsub, conn: conn) do
context = Map.merge(conn.private.absinthe.context, %{pubsub: pubsub})
{:ok, %{"subscribed" => topic}} = Absinthe.run(conn.params["query"], schema, context: context)
pubsub.subscribe(topic)
conn =
conn
|> Plug.Conn.put_resp_header("content-type", "text/event-stream")
|> Plug.Conn.send_chunked(200)
{:ok, %{conn: conn, topic: topic}}
end
def handle_info({:broadcast, msg}, %{conn: conn} = state) do
Plug.Conn.chunk(conn, "data: #{Poison.encode!(msg.result.data)}\n\n")
{:noreply, state}
end
end
And a dummy local PubSub example...
defmodule MyPubSub do
@behaviour Absinthe.Subscription.Pubsub
def start_link() do
Registry.start_link(:unique, __MODULE__)
end
def subscribe(topic) do
Registry.register(__MODULE__, topic, [])
:ok
end
def publish_subscription(topic, data) do
message = %{
topic: topic,
event: "subscription:data",
result: data,
}
Registry.dispatch(__MODULE__, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
end)
end
def publish_mutation(proxy_topic, mutation_result, subscribed_fields) do
# this pubsub is local and doesn't support clusters
:ok
end
end