Last active
March 23, 2017 13:03
-
-
Save tpitale/3915671 to your computer and use it in GitHub Desktop.
Reel Websocket Server using PG Listen/Notify for crude pubsub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'bundler/setup' | |
require 'reel' | |
require 'celluloid/io' | |
require 'pg' | |
module PGNotifications | |
def self.included(actor) | |
actor.send(:include, Celluloid::IO) | |
end | |
def actions | |
@actions ||= {} | |
end | |
def pg_connection | |
@pg_connection ||= PG.connect( dbname: 'ws_example' ) | |
end | |
def notify(channel, value) | |
pg_connection.exec("NOTIFY #{channel}, '#{value}';") | |
end | |
def listen(channel, action) | |
actions[channel] = action | |
pg_connection.exec("LISTEN #{channel}") | |
end | |
def start_listening | |
info "Starting Listening" | |
@listening = true | |
wait_for_notify do |channel, pid, payload| | |
info "Received notification: #{[channel, pid, payload].inspect}" | |
send(actions[channel], channel, payload) | |
end | |
end | |
def stop_listening | |
@listening = false | |
end | |
def wait_for_notify(&block) | |
io = pg_connection.socket_io | |
while @listening do | |
Celluloid::IO.wait_readable(io) # blocks execution, but unblocks this actor | |
pg_connection.consume_input # fetch any input on this connection | |
while notification = pg_connection.notifies do | |
block.call(*[ | |
notification[:relname], # channel | |
notification[:be_pid], # pid | |
notification[:extra] # payload | |
]) | |
end | |
end | |
end | |
def unlisten(channel) | |
(@listening ||= {})[channel] = false | |
pg_connection.exec("UNLISTEN #{channel}") | |
end | |
end | |
class TimeServer | |
include Celluloid | |
include PGNotifications | |
include Celluloid::Logger | |
def initialize | |
async.run | |
end | |
def run | |
now = Time.now.to_f | |
sleep now.ceil - now + 0.001 | |
every(1) { notify 'time_change', Time.now.inspect } | |
info 'Registered to send time_change' | |
every(10) { notify 'keepalive', Time.now.to_i } | |
info 'Registered to send keepalive' | |
end | |
end | |
class TimeClient | |
include PGNotifications | |
include Celluloid::Logger | |
KEEPALIVE_LIMIT = 30 # keep it short | |
def initialize(socket) | |
info "Streaming time changes to client" | |
@socket = socket | |
@limit = Time.now.to_i + KEEPALIVE_LIMIT | |
async.start_listening | |
async.listen('time_change', :notify_time_change) | |
async.listen('keepalive', :report_keepalive) | |
end | |
def notify_time_change(channel, new_time) | |
@socket << new_time | |
rescue Reel::SocketError | |
info "Time client disconnected" | |
unlisten(channel) | |
terminate | |
end | |
def report_keepalive(channel, timestamp) | |
info "Current keepalive timestamp: #{timestamp}" | |
# unlisten(channel) if timestamp.to_i > @limit | |
end | |
end | |
class WebServer < Reel::Server::HTTP | |
include Celluloid::Logger | |
def initialize(host = "127.0.0.1", port = 1234) | |
info "Time server example starting on #{host}:#{port}" | |
super(host, port, &method(:on_connection)) | |
end | |
def on_connection(connection) | |
connection.each_request do |request| | |
if request.websocket? | |
info "Received a WebSocket connection" | |
handle_websocket request.websocket | |
else | |
handle_request request | |
end | |
end | |
end | |
def handle_request(request) | |
if request.url == "/" | |
return render_index(request) | |
end | |
info "404 Not Found: #{request.path}" | |
request.respond :not_found, "Not found" | |
end | |
def handle_websocket(socket) | |
if socket.url == "/timeinfo" | |
TimeClient.new(socket) | |
else | |
info "Received invalid WebSocket request for: #{socket.url}" | |
socket.close | |
end | |
end | |
def render_index(request) | |
info "200 OK: /" | |
request.respond :ok, <<-HTML | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<title>Reel WebSockets time server example</title> | |
<style> | |
body { | |
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif; | |
font-weight: 300; | |
text-align: center; | |
} | |
#content { | |
width: 800px; | |
margin: 0 auto; | |
background: #EEEEEE; | |
padding: 1em; | |
} | |
</style> | |
</head> | |
<script> | |
var SocketKlass = "MozWebSocket" in window ? MozWebSocket : WebSocket; | |
var ws = new SocketKlass('ws://' + window.location.host + '/timeinfo'); | |
ws.onmessage = function(msg){ | |
document.getElementById('current-time').innerHTML = msg.data; | |
} | |
</script> | |
<body> | |
<div id="content"> | |
<h1>Time Server Example</h1> | |
<div>The time is now: <span id="current-time">...</span></div> | |
</div> | |
</body> | |
</html> | |
HTML | |
end | |
end | |
TimeServer.supervise_as :time_server | |
WebServer.supervise_as :reel | |
sleep |
Revision 14eeea263bb0320c2cedaee36ec6d792e9b1e492 now supports multiple listens per actor.
I've recreated the wait_for_notify from the pg
gem and enhanced it to work with Celluloid::IO
.
@tpitale The unlisten breaks for me because of @listening = true
with:
E, [2015-10-06T11:28:21.122062 #3373] ERROR -- : Actor crashed!
NoMethodError: undefined method `[]=' for true:TrueClass
@tpitale For now I decided to just stop_listening
altogether rather than try to manage by channel. I have created a gem based on your work here (with attribution, let me know if you'd like a different kind of mention, or if you'd like to be a collaborator on the project).
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Works now with the latest reel at this time (0.5.0).
Seems to only be limited by the available number of postgresql connections.