Skip to content

Instantly share code, notes, and snippets.

@seanlinsley
Last active August 26, 2020 01:13
Show Gist options
  • Select an option

  • Save seanlinsley/775e9d934128b62ace8c to your computer and use it in GitHub Desktop.

Select an option

Save seanlinsley/775e9d934128b62ace8c to your computer and use it in GitHub Desktop.
Websockets with Postgres listen/notify and the Tubesock gem
require 'pg_notify'
class ChatController < ApplicationController
include Tubesock::Hijack
@@notifier = PGNotify.new 'chat'
def chat
hijack do |websocket|
websocket.onopen do
@@notifier.subscribe websocket do |payload|
if payload[:id] == params[:id]
websocket.send_data payload[:message]
end
end
end
websocket.onclose do
@@notifier.unsubscribe websocket
end
websocket.onmessage do |message|
user = current_user ? current_user.display_name : "Anonymous"
@@notifier.notify id: params[:id], message: "#{user}: #{message}"
end
end
end
end
class PGNotify
def initialize(channel)
@channel = channel
@subscribers = {}
@mutex = Mutex.new
end
def subscribe(object, &block)
@subscribers[object] = block
thread_with_connection{ |conn| setup_listener(conn) }
end
def unsubscribe(object)
@subscribers.delete object
end
def notify(data)
data = data.to_json.gsub("'", "''")
ActiveRecord::Base.connection.execute "notify #{@channel}, '#{data}'"
end
private
def setup_listener(connection)
connection.async_exec "listen #{@channel}"
loop do
connection.wait_for_notify do |channel, pid, payload|
json = JSON.parse payload, symbolize_names: true
@subscribers.each{ |_, block| block.call json }
end
end
ensure
connection.async_exec "unlisten #{@channel}"
end
def thread_with_connection
@mutex.synchronize do
@bgthread ? return : @bgthread = true
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do |connection|
begin
yield connection.raw_connection
ensure
@bgthread = false
end
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment