Example end-to-end web app with subscriptions, demonstrating APIs.
See rmosolgo/graphql-ruby#613.
To run:
bundle install
bundle exec rackup
open localhost:4567
Example end-to-end web app with subscriptions, demonstrating APIs.
See rmosolgo/graphql-ruby#613.
To run:
bundle install
bundle exec rackup
open localhost:4567
require "graphql" | |
require "sinatra" | |
require "thin" | |
# Here's the application logic: | |
# - A set of stateful counters, identified by `#id`. They can be incremented. | |
# - An in-memory subscription database | |
# - A transport for sending payloads over open sockets | |
# - A queuing system for isolating subscription execution & delivery | |
module App | |
# An incrementing counter, identified by ID | |
Counter = Struct.new(:id, :value) do | |
def increment | |
self.value += 1 | |
end | |
# Get or create a counter by id | |
def self.find(id) | |
@counters ||= Hash.new { |h, counter_id| h[counter_id] = Counter.new(counter_id, 0) } | |
@counters[id] | |
end | |
end | |
class Subscriptions < GraphQL::Subscriptions::Implementation | |
def initialize(schema:) | |
super | |
# Here's the "database": | |
# | |
# For each counter, who is subscribed to it? | |
# @return Hash<String => Array<GraphQL::Query>> | |
@subscriptions = Hash.new { |h, event_id| h[event_id] = [] } | |
# | |
# Given a Channel, who does it belong to? | |
# @return Hash<String => GraphQL::Query> | |
@queries = {} | |
# | |
# Given a Channel, return the IO to write to | |
# @return Hash<String => IO> | |
@streams = {} | |
end | |
# Part of the subscription API: put these subscriptions in the database | |
def subscribed(query, events) | |
puts "Registering #{query.context[:channel]}" | |
@queries[query.context[:channel]] = query | |
events.each do |ev| | |
@subscriptions[ev.key] << query | |
end | |
end | |
# Part of the subscription API: load the query data for this channel | |
def get_subscription(channel) | |
query = @queries[channel] | |
{ | |
query_string: query.query_string, | |
variables: query.provided_variables, | |
context: {}, | |
operation_name: query.operation_name, | |
} | |
end | |
# Part of the subscription API: fetch subscriptions from the DB and yield them one-by-one | |
def each_channel(event_key) | |
@subscriptions[event_key].each do |query| | |
yield(query.context[:channel]) | |
end | |
end | |
# Not used by GraphQL, but the Application needs some way to unsubscribe | |
# `Schema#subscriber` delegates to this, eg `MySchema.subscriber.delete(channel)` | |
def delete(channel) | |
@queries.delete(channel) | |
@subscriptions.each do |event_key, queries| | |
queries.reject! { |q| q.context[:channel] == channel } | |
end | |
close(channel) | |
end | |
# Optional subscription API -- could use ActiveJob etc here: | |
def enqueue(channel, event_key, object) | |
Thread.new { | |
execute(channel, event_key, object) | |
} | |
end | |
# Part of the subscription API: send `result` over `channel`. | |
def deliver(channel, result, ctx) | |
puts "Delivering to #{channel}: #{result}" | |
stream = @streams[channel] | |
# The client _may_ have opened this channel: | |
if stream | |
stream << "event: update\n" | |
stream << "data: #{JSON.dump(result)}\n\n" | |
else | |
# Stream was closed or never opened | |
delete(channel) | |
end | |
end | |
# Used by the transport layer: | |
def open(channel, stream) | |
@streams[channel] = stream | |
end | |
# Not used by GraphQL, but needed by the App to unsubscribe | |
def close(channel) | |
@streams.delete(channel) | |
end | |
end | |
end | |
# Here's the GraphQL API for this application: | |
module API | |
# Type system: | |
Definition = <<-GRAPHQL | |
type Subscription { | |
counterIncremented(id: ID!): Counter | |
} | |
type Counter { | |
id: ID! | |
value: Int! | |
} | |
type Query { | |
counter(id: ID!): Counter | |
} | |
type Mutation { | |
incrementCounter(id: ID!): Counter | |
} | |
GRAPHQL | |
# Resolve functions: | |
Resolvers = { | |
"Mutation" => { | |
"incrementCounter" => ->(o, a, c) { | |
counter = App::Counter.find(a["id"]) | |
counter.increment | |
API::Schema.subscriber.trigger("counterIncremented", a, counter) | |
counter | |
} | |
}, | |
"Query" => { | |
"counter" => ->(o, a, c) { App::Counter.find(a["id"]) } | |
}, | |
"Counter" => { | |
"value" => ->(o, a, c) { o.value }, | |
"id" => ->(o, a, c) { o.id }, | |
} | |
} | |
# Schema, defined from the definition then updated with subscription info | |
Schema = GraphQL::Schema.from_definition(Definition, default_resolve: Resolvers).redefine do | |
use GraphQL::Subscriptions, implementation: App::Subscriptions | |
end | |
end | |
# Serve the HTML subscription dashboard | |
get "/" do | |
erb :index | |
end | |
# Send queries here, it will provide a Channel ID which the client _may_ open | |
post "/graphql" do | |
content_type "application/json" | |
channel = "socket-#{rand(10000)}" | |
res = API::Schema.execute(params[:query], variables: params[:variables], context: { channel: channel }) | |
[200, {"x-channel-id" => channel}, JSON.dump(res)] | |
end | |
# Clients may open their channels here to receive updates | |
get '/channels/:channel' do | |
content_type 'text/event-stream' | |
stream(:keep_open) do |out| | |
channel = params[:channel] | |
puts "Stream for #{channel}" | |
API::Schema.subscriber.implementation.open(channel, out) | |
out.callback { | |
puts "Unsubscribing #{channel}" | |
# This is forwarded to the `store` | |
API::Schema.subscriber.implementation.delete(channel) | |
} | |
end | |
end | |
__END__ | |
@@ index | |
<html> | |
<head> | |
<title>GraphQL Subscriptions Example</title> | |
<script src="https://code.jquery.com/jquery-3.2.1.js" integrity="sha256-DZAnKJ/6XZ9si04Hgrsxu/8s717jcIzLy3oi35EouyE=" crossorigin="anonymous"></script> | |
<style> | |
.dashboard { | |
display: flex; | |
} | |
table { | |
margin: 20px; | |
} | |
</style> | |
</head> | |
<body> | |
<h1>Subscriptions</h1> | |
<div class="dashboard"> | |
<table> | |
<thead> | |
<tr> | |
<th colspan="4">Counters</th> | |
<tr> | |
<tr> | |
<th>Id</th> | |
<th>Value</td> | |
<th colspan="2">Actions</th> | |
</tr> | |
</thead> | |
<tbody id="counters"> | |
</tbody> | |
</table> | |
<table> | |
<thead> | |
<tr> | |
<th colspan="3">Updates</th> | |
</tr> | |
<tr> | |
<th>Channel</th> | |
<th>Counter</th> | |
<th>Value</th> | |
</tr> | |
</thead> | |
<tbody id="updates"> | |
</tbody> | |
</table> | |
<table> | |
<thead> | |
<tr> | |
<th colspan="2">Channels</th> | |
</tr> | |
<tr> | |
<th>Id</th> | |
<th>Actions</th> | |
<tr> | |
</thead> | |
<tbody id="channels"> | |
</tbody> | |
</table> | |
</div> | |
<script> | |
var eventSources = {} | |
var subscriptionString = "subscription WatchCounter($id: ID!) { counterIncremented(id: $id) { id value } }" | |
var mutationString = "mutation IncrementCounter($id: ID!) { incrementCounter(id: $id) { value } }" | |
function incrementCounter(id) { | |
$.post("/graphql", {query: mutationString, variables: { id: id } }, function(response) { | |
$("#counter-value-" + id).text(response.data.incrementCounter.value) | |
}) | |
} | |
function subscribeToCounter(id) { | |
$.ajax({ | |
type: "POST", | |
url: "/graphql", | |
data: { | |
query: subscriptionString, | |
variables: { | |
id: id, | |
}, | |
}, | |
success: function(data, status, jqXHR) { | |
var channelId = jqXHR.getResponseHeader("x-channel-id") | |
openChannel(channelId) | |
} | |
}) | |
} | |
function openChannel(channelId) { | |
var eventSource = new EventSource("/channels/" + channelId) | |
eventSources[channelId] = eventSource | |
eventSource.addEventListener("update", function(e) { | |
console.log("message", e) | |
var result = JSON.parse(e.data) | |
var counter = result.data.counterIncremented | |
$("#counter-value-" + counter.id).text(counter.value) | |
$("#updates").append("<tr><td>" + channelId + "</td><td>" + counter.id + "</td><td>" + counter.value + "</td></tr>") | |
}) | |
$("#channels").append(channelRowTemplate.replace(/%channelid%/g, channelId)) | |
} | |
function closeChannel(channelId) { | |
eventSources[channelId].close() | |
delete eventSources[channelId] | |
$("#channel-" + channelId).remove() | |
} | |
var counterRowTemplate = "<tr><td>%counterid%</td><td id='counter-value-%counterid%'>...</td><td><button onclick='incrementCounter(%counterid%)'>+ increment</button></td><td><button onclick='subscribeToCounter(%counterid%)'>subscribe</button></td></tr>" | |
var channelRowTemplate = "<tr id='channel-%channelid%'><td>%channelid%</td><td><button onclick='closeChannel(%channelid%)'>unsubscribe</button></td></tr>" | |
var counterIds = [1, 2, 3] | |
counterIds.forEach(function(counterId) { | |
$("#counters").append(counterRowTemplate.replace(/%counterid%/g, counterId)) | |
}) | |
var initialLoadQuery = "{ c1: counter(id: 1) { value } c2: counter(id: 2) { value } c3: counter(id: 3) { value } }" | |
$.post("/graphql", {query: initialLoadQuery}, function(data) { | |
$("#counter-value-1").text(data.data.c1.value) | |
$("#counter-value-2").text(data.data.c2.value) | |
$("#counter-value-3").text(data.data.c3.value) | |
}) | |
</script> | |
</body> | |
</html> |
require "./app" | |
run Sinatra::Application |
source 'https://rubygems.org' | |
gem "graphql", github: "rmosolgo/graphql-ruby", branch: "subscriptions" | |
gem "sinatra" | |
gem "thin" |
GIT | |
remote: git://github.com/rmosolgo/graphql-ruby.git | |
revision: 2c011f6fbc50132706d14a52027f4eea9df2e103 | |
branch: subscriptions | |
specs: | |
graphql (1.6.6) | |
GEM | |
remote: https://rubygems.org/ | |
specs: | |
daemons (1.2.4) | |
eventmachine (1.2.5) | |
mustermann (1.0.0) | |
rack (2.0.3) | |
rack-protection (2.0.0) | |
rack | |
sinatra (2.0.0) | |
mustermann (~> 1.0) | |
rack (~> 2.0) | |
rack-protection (= 2.0.0) | |
tilt (~> 2.0) | |
thin (1.7.2) | |
daemons (~> 1.0, >= 1.0.9) | |
eventmachine (~> 1.0, >= 1.0.4) | |
rack (>= 1, < 3) | |
tilt (2.0.8) | |
PLATFORMS | |
ruby | |
DEPENDENCIES | |
graphql! | |
sinatra | |
thin | |
BUNDLED WITH | |
1.15.3 |