Created
August 5, 2015 04:44
-
-
Save nviennot/67a0795db7ccad973885 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# `Gemfile` should have the following: | |
# source 'https://rubygems.org' | |
# gem 'goliath' | |
# gem 'nobrainer', :github => 'nviennot/nobrainer' | |
require 'bundler' | |
Bundler.require | |
# NoBrainer configuration. | |
# The default db name will be "goliath_development" | |
# We also instruct to use the EventMachine driver, and log debug info in STDERR. | |
NoBrainer.configure do |config| | |
config.app_name = "goliath" | |
config.environment = Goliath.env | |
config.driver = :em | |
config.logger = Logger.new(STDERR).tap { |logger| logger.level = Logger::DEBUG } | |
end | |
# StreamFiber is a helper that we'll use to process our asynchronous responses | |
# Calling stream() immediately returns http headers to the client, and | |
# schedule the passed blocked to be ran in a Fiber on the next EM tick. | |
# We must not let bubble up any exceptions from the fiber to avoid killing the | |
# EventMachine loop, killing the Goliath server. | |
module StreamFiber | |
def guard_async_response(env, &block) | |
block.call(env) | |
rescue Exception => e | |
begin | |
msg = {:error => "#{e.class} -- #{e.message.split("\n").first}"}.to_json | |
STDERR.puts msg | |
env.chunked_stream_send("#{msg}\n") | |
env.chunked_stream_close | |
rescue | |
end | |
ensure | |
env.chunked_stream_close | |
end | |
def stream(env, &block) | |
EM.next_tick { Fiber.new { guard_async_response(env, &block) }.resume } | |
chunked_streaming_response | |
end | |
end | |
# When the client disconnects, we must abort the ongoing changes() queries to | |
# avoid leaking resources. We'll add a bind_connection_to_cursor() method | |
# that will ensure that close() gets called on it when the client disconnects. | |
module BindCursor | |
def bind_connection_to_cursor(env, cursor) | |
if env['connection_closed'] | |
cursor.close | |
else | |
env['cursors'] ||= [] | |
env['cursors'] << cursor | |
end | |
cursor | |
end | |
def on_close(env) | |
(env['cursors'] || []).each(&:close) | |
env['connection_closed'] = true | |
end | |
end | |
# -------------------------------------------------------------------------- | |
# Our application code starts here. | |
# 1) We'll use a Message model, with a required subject. | |
# 2) We'll define a Goliath endpoint which responds to /creates and /changes, | |
# which create Messages and listen to message changes. | |
# See below an example with curl. | |
# | |
# The interesting code is the changes() endpoint. First, it opens a streaming | |
# response to the client. Then it performs a changes() query filtered with the | |
# passed params. This query returns a cursor that can be indefinitely iterated | |
# upon. we call bind_connection_to_cursor() to make sure that cusor.each { } | |
# returns immediately when the client disconnects. | |
class Message | |
include NoBrainer::Document | |
field :subject, :type => String, :required => true | |
field :body, :type => Text | |
end | |
class Stream < Goliath::API | |
use Goliath::Rack::Params | |
include StreamFiber | |
include BindCursor | |
def changes(env) | |
stream(env) do | |
Message.where(env['params']).raw.changes(:include_states => true) | |
.tap { |cursor| bind_connection_to_cursor(env, cursor) } | |
.each { |changes| env.chunked_stream_send("#{changes.to_json}\n") } | |
end | |
end | |
def create(env) | |
user = Message.create!(env['params']) | |
[200, {}, user.to_json] | |
end | |
def response(env) | |
case [env['REQUEST_METHOD'].downcase.to_sym, env['PATH_INFO']] | |
when [:get, '/changes'] then changes(env) | |
when [:post, '/create'] then create(env) | |
else raise Goliath::Validation::NotFoundError | |
end | |
end | |
end | |
# When running the server as such: | |
# $ ruby <this_file.rb> -sv | |
# | |
# You should see: | |
# [28604:INFO] 2015-08-04 23:08:43 :: Starting server on 0.0.0.0:9000 in development mode. Watch out for stones. | |
# | |
# Now we can issue a few requests on our HTTP server. | |
# | |
# -------------------------------------- | |
# | |
# Example 1: Creating an invalid message | |
# | |
# $ curl -X POST localhost:9000/create | |
# [:error, "#<Message id: \"2G7tczvZy9UdtL\"> is invalid: Subject can't be blank"] | |
# | |
# ----------------------------------------------------------- | |
# | |
# Example 2: Creating a message while listening for changes | |
# | |
# First we listen for changes: | |
# $ curl localhost:9000/changes | |
# {"state":"ready"} | |
# | |
# Then we open a new shell and run: | |
# $ curl -X POST localhost:9000/create?subject=hello | |
# {"subject":"hello","id":"2G7yTuZ549CcQF"} | |
# | |
# We see on previous curl appear: | |
# {"new_val":{"id":"2G7yTuZ549CcQF","subject":"hello"},"old_val":null} | |
# | |
# On the server terminal, we should see the following: | |
# DEBUG -- : [ 4.4ms] r.table("messages").changes({"include_states" => true}) | |
# DEBUG -- : [ 5.1ms] r.table("messages").insert({"subject" => "hello", "id" => "2G7yTuZ549CcQF"}) | |
# | |
# -------------------------------------------------------- | |
# | |
# Example 3: Listening for changes on a specific subject | |
# | |
# $ curl localhost:9000/changes?subject=hi | |
# | |
# $ curl -X POST localhost:9000/create?subject=blah | |
# $ curl -X POST localhost:9000/create?subject=hi | |
# | |
# We only see the changes of the second Message, not the first one. | |
# | |
# --------------------------------------------------------- | |
# | |
# Example 4: Running many clients | |
# | |
# $ for i in `seq 10`; do curl -N localhost:9000/changes &; done | |
# | |
# We see 10 times {"state":"ready"} | |
# | |
# $ curl -X POST localhost:9000/create?subject=hello | |
# | |
# We see 10 times {"new_val":{"id":"2G815T2RT9mklP","subject":"hello"},"old_val":null} | |
# | |
# This demonstrate that our server can handle many clients simultaneously. | |
# | |
# --------------------------------------------------------- | |
# | |
# Example 5: Handling connection failures | |
# | |
# If we kill the rethinkdb server while a /changes call is in progress we should | |
# see the following: | |
# | |
# $ curl localhost:9000/changes | |
# {"state":"ready"} | |
# {"error":"RethinkDB::RqlDriverError -- Connection closed by server."} | |
# | |
# If we try to redo the curl command: | |
# $ curl localhost:9000/changes | |
# {"error":"RethinkDB::RqlRuntimeError -- Connection is closed."} | |
# | |
# Once we restart the rethinkdb server, we can reissue requests with no further | |
# intervention: | |
# $ curl localhost:9000/changes | |
# {"state":"ready"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment