Skip to content

Instantly share code, notes, and snippets.

@raggi
Last active March 23, 2024 11:10
Show Gist options
  • Save raggi/ff7971991297e5c8a1ce to your computer and use it in GitHub Desktop.
Save raggi/ff7971991297e5c8a1ce to your computer and use it in GitHub Desktop.
Rack SSE Example
# rack_sse.ru
#
# An example of basic real-time, single-room broadcast chat using Server Sent
# Events in plain old Rack. This example does NOT use hijack, or the async
# hacks, it just relies on a well implemented threaded Rack server (at time of
# writing this will therefore only work with puma!). Other servers should be
# fixed to support this, as it is pretty critical to how Rack *should* work on
# most servers. The only spec-acceptable failure in this case is not flushing
# the content stream on each yield (for which the rack spec has no workaround
# today).
#
# Run with: rackup -E none -s puma rack_sse.ru
# or: puma -e none rack_sse.ru
class SSE
class Event < Struct.new(:event, :data, :id, :retry)
def to_s
self.class.members.map do |m|
if v = send(m)
"#{m}: #{v}"
end
end.compact.join("\n") + "\n\n"
end
end
class Stream
def initialize
@q = Queue.new
end
def add event
raise TypeError, "expected SSE::Event" unless event.kind_of? Event
@q.push event
end
alias << add
def each
while event = @q.pop
yield event.to_s
end
end
def close
@q.push nil
end
end
class BroadcastList
def initialize
@list = []
@mutex = Mutex.new
end
def add stream
raise TypeError, "expected SSE::Stream" unless stream.kind_of? Stream
@mutex.synchronize do
@list << stream
end
end
def remove stream
raise TypeError, "expected SSE::Stream" unless stream.kind_of? Stream
@mutex.synchronize do
@list.delete(stream)
end
end
def broadcast event
targets = @mutex.synchronize { @list.dup }
targets.each do |stream|
stream << event
end
end
end
end
HOMEPAGE = <<-HTML
<!doctype html>
<title>messenger</title>
<style>
html, body { margin: 0; padding: 0; box-sizing: border-box }
body {
background: #fefefe;
display: flex;
flex-direction: column;
height: 100%;
width: 100%;
position: absolute;
}
#history {
font: 16pt 'Courier New';
flex: 1 1 auto;
overflow: scroll;
}
#history div {
padding: 0.5em;
}
#history div:nth-child(even) {
background: lightyellow;
}
#history div:nth-child(odd) {
background: white;
}
#input {
flex: 0 0 auto;
font-size: 16pt;
height: 2em;
border: 0;
border-top: 1px solid silver;
padding: 0.5em;
}
</style>
<div id=history></div>
<input id=input placeholder="send a message" autofocus>
<script>
var eventSource = new EventSource("/");
eventSource.addEventListener("chat", function(e) {
var message = JSON.parse(e.data);
var container = document.createElement("div");
container.innerContent = message.chat;
var history = document.querySelector("#history");
history.appendChild(container);
history.scrollTop = history.scrollHeight;
});
var input = document.querySelector("#input");
input.addEventListener("change", function(e) {
e.preventDefault();
if (e.target.value === "") {
return;
}
// window.fetch cannot land soon enough...
var xhr = new XMLHttpRequest();
xhr.open("POST", "/");
xhr.setRequestHeader("Content-Type", "application/json");
xhr.send(JSON.stringify({chat: e.target.value}));
e.target.value = "";
});
</script>
HTML
require 'json'
class MessengerApp
def initialize
@broadcast_list = SSE::BroadcastList.new
end
def get env
case env["HTTP_ACCEPT"]
when %r"text/event-stream"
stream = SSE::Stream.new
@broadcast_list.add stream
body = Rack::BodyProxy.new(stream) { @broadcast_list.remove stream }
[
200,
{
"Content-Type" => "text/event-stream",
"Cache-Control" => "no-cache",
# For nginx:
"X-Accel-Buffering" => "no",
},
body
]
else
[200, {"Content-Type" => "text/html"}, [HOMEPAGE]]
end
end
def post env
unless env['CONTENT_TYPE'] =~ %r"application/json"
return [400, {"Content-Type" => "text/plain"}, ["Expected application/json"]]
end
# XXX: unlimited read here, do not use unprotected!
message = JSON.parse(env['rack.input'].read)
unless message["chat"].kind_of? String
return [400, {"Content-Type" => "text/plain"}, ['Expected {"chat": ...}']]
end
# XXX: passing the message structure on blindly, don't continue to do this.
@broadcast_list.broadcast SSE::Event.new("chat", JSON.dump(message))
[200, {}, []]
end
def call env
case env["REQUEST_METHOD"]
when "GET"
get env
when "POST"
post env
else
[404, {}, []]
end
end
end
run MessengerApp.new
@tbuehlmann
Copy link

PoCs for Cuba and Sinatra.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment