Last active
March 23, 2024 11:10
-
-
Save raggi/ff7971991297e5c8a1ce to your computer and use it in GitHub Desktop.
Rack SSE Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# rack_sse.ru | |
# | |
# An example of basic real-time, single-room broadcast chat using Server Sent | |
# Events in plain old Rack. This example does NOT use hijack, or the async | |
# hacks, it just relies on a well implemented threaded Rack server (at time of | |
# writing this will therefore only work with puma!). Other servers should be | |
# fixed to support this, as it is pretty critical to how Rack *should* work on | |
# most servers. The only spec-acceptable failure in this case is not flushing | |
# the content stream on each yield (for which the rack spec has no workaround | |
# today). | |
# | |
# Run with: rackup -E none -s puma rack_sse.ru | |
# or: puma -e none rack_sse.ru | |
class SSE | |
class Event < Struct.new(:event, :data, :id, :retry) | |
def to_s | |
self.class.members.map do |m| | |
if v = send(m) | |
"#{m}: #{v}" | |
end | |
end.compact.join("\n") + "\n\n" | |
end | |
end | |
class Stream | |
def initialize | |
@q = Queue.new | |
end | |
def add event | |
raise TypeError, "expected SSE::Event" unless event.kind_of? Event | |
@q.push event | |
end | |
alias << add | |
def each | |
while event = @q.pop | |
yield event.to_s | |
end | |
end | |
def close | |
@q.push nil | |
end | |
end | |
class BroadcastList | |
def initialize | |
@list = [] | |
@mutex = Mutex.new | |
end | |
def add stream | |
raise TypeError, "expected SSE::Stream" unless stream.kind_of? Stream | |
@mutex.synchronize do | |
@list << stream | |
end | |
end | |
def remove stream | |
raise TypeError, "expected SSE::Stream" unless stream.kind_of? Stream | |
@mutex.synchronize do | |
@list.delete(stream) | |
end | |
end | |
def broadcast event | |
targets = @mutex.synchronize { @list.dup } | |
targets.each do |stream| | |
stream << event | |
end | |
end | |
end | |
end | |
HOMEPAGE = <<-HTML | |
<!doctype html> | |
<title>messenger</title> | |
<style> | |
html, body { margin: 0; padding: 0; box-sizing: border-box } | |
body { | |
background: #fefefe; | |
display: flex; | |
flex-direction: column; | |
height: 100%; | |
width: 100%; | |
position: absolute; | |
} | |
#history { | |
font: 16pt 'Courier New'; | |
flex: 1 1 auto; | |
overflow: scroll; | |
} | |
#history div { | |
padding: 0.5em; | |
} | |
#history div:nth-child(even) { | |
background: lightyellow; | |
} | |
#history div:nth-child(odd) { | |
background: white; | |
} | |
#input { | |
flex: 0 0 auto; | |
font-size: 16pt; | |
height: 2em; | |
border: 0; | |
border-top: 1px solid silver; | |
padding: 0.5em; | |
} | |
</style> | |
<div id=history></div> | |
<input id=input placeholder="send a message" autofocus> | |
<script> | |
var eventSource = new EventSource("/"); | |
eventSource.addEventListener("chat", function(e) { | |
var message = JSON.parse(e.data); | |
var container = document.createElement("div"); | |
container.innerContent = message.chat; | |
var history = document.querySelector("#history"); | |
history.appendChild(container); | |
history.scrollTop = history.scrollHeight; | |
}); | |
var input = document.querySelector("#input"); | |
input.addEventListener("change", function(e) { | |
e.preventDefault(); | |
if (e.target.value === "") { | |
return; | |
} | |
// window.fetch cannot land soon enough... | |
var xhr = new XMLHttpRequest(); | |
xhr.open("POST", "/"); | |
xhr.setRequestHeader("Content-Type", "application/json"); | |
xhr.send(JSON.stringify({chat: e.target.value})); | |
e.target.value = ""; | |
}); | |
</script> | |
HTML | |
require 'json' | |
class MessengerApp | |
def initialize | |
@broadcast_list = SSE::BroadcastList.new | |
end | |
def get env | |
case env["HTTP_ACCEPT"] | |
when %r"text/event-stream" | |
stream = SSE::Stream.new | |
@broadcast_list.add stream | |
body = Rack::BodyProxy.new(stream) { @broadcast_list.remove stream } | |
[ | |
200, | |
{ | |
"Content-Type" => "text/event-stream", | |
"Cache-Control" => "no-cache", | |
# For nginx: | |
"X-Accel-Buffering" => "no", | |
}, | |
body | |
] | |
else | |
[200, {"Content-Type" => "text/html"}, [HOMEPAGE]] | |
end | |
end | |
def post env | |
unless env['CONTENT_TYPE'] =~ %r"application/json" | |
return [400, {"Content-Type" => "text/plain"}, ["Expected application/json"]] | |
end | |
# XXX: unlimited read here, do not use unprotected! | |
message = JSON.parse(env['rack.input'].read) | |
unless message["chat"].kind_of? String | |
return [400, {"Content-Type" => "text/plain"}, ['Expected {"chat": ...}']] | |
end | |
# XXX: passing the message structure on blindly, don't continue to do this. | |
@broadcast_list.broadcast SSE::Event.new("chat", JSON.dump(message)) | |
[200, {}, []] | |
end | |
def call env | |
case env["REQUEST_METHOD"] | |
when "GET" | |
get env | |
when "POST" | |
post env | |
else | |
[404, {}, []] | |
end | |
end | |
end | |
run MessengerApp.new |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
PoCs for Cuba and Sinatra.