Skip to content

Instantly share code, notes, and snippets.

@solisoft
Last active October 6, 2017 10:07
Show Gist options
  • Save solisoft/47a5a38e7363c1be619b08c714fec318 to your computer and use it in GitHub Desktop.
Save solisoft/47a5a38e7363c1be619b08c714fec318 to your computer and use it in GitHub Desktop.
require "kemal"
require "jwt"
require "json"
require "yaml"
require "arangocr"
require "redis"
# Load .env file
env_data = YAML.parse File.read(".env.yml") rescue Hash(String, String).new
env_data.each { |k, v| ENV[k.to_s] = v.to_s }
# Create ArangoDB Client (Not used for now)
arango = Arango::Client.new(ENV["ARANGO_HOST"], ENV["ARANGO_USER"], ENV["ARANGO_PASSWORD"], ENV["ARANGO_DATABASE"])
database = arango.database
# Connect to Redis and create subscribed channels
redis = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"])
sub = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"])
sub_users = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"])
# Don't server static files
serve_static false
# Initialize variables
connected_sockets = {} of String => Array(HTTP::WebSocket)
connected_users = {} of String => Hash(String, String)
foreign_users = {} of String => Hash(String, String)
# broadcast
# send a message to all connected servers via redis
def broadcast(data, room, redis)
redis.publish("msg-bus", {"room" => room, "data" => data}.to_json)
end
# list_of_users
# Send the list of users to all connected users for a specific room
def list_of_users(users, sockets)
sockets.each do |socket|
socket.send({action: "list", users: users}.to_json)
end
end
# launch subscribe listeners for users sync
spawn do
sub_users.subscribe("users-bus") do |on|
on.message do |channel, message|
message = JSON.parse(message.to_s)
p message
room = message["room"].to_s
foreign_users[room] ||= Hash(String, String).new
connected_users[room] ||= Hash(String, String).new
connected_sockets[room] ||= [] of HTTP::WebSocket
if message["action"] == "login"
foreign_users[room][message["id"].to_s] = message["login"].to_s
list_of_users(foreign_users[room].values.uniq, connected_sockets[room])
end
if message["action"] == "logout"
foreign_users[room].reject!(message["id"].to_s)
list_of_users(foreign_users[room].values.uniq, connected_sockets[room])
end
if message["action"] == "refresh"
foreign_users[room] = Hash(String, String).new
redis.publish("users-bus", {action: "list", room: room, users: connected_users[room]}.to_json)
end
if message["action"] == "list"
message["users"].each do |k, v|
foreign_users[room][k.to_s] = v.to_s
end
list_of_users(foreign_users[room].values.uniq, connected_sockets[room])
end
end
end
end
# launch subscribe listeners for messqges
spawn do
sub.subscribe("msg-bus") do |on|
on.message do |channel, message|
message = JSON.parse(message.to_s)
room = message["room"].to_s
connected_sockets[room] ||= [] of HTTP::WebSocket
connected_sockets[room].each do |socket|
socket.send message["data"].to_json
end
end
end
end
# ws /
# manage multiple rooms via the query string
# client sample (create room r1):
# ws = new WebSocket('ws://localhost:3000/?room=r1');
ws "/" do |socket, env|
room = env.params.query["room"] rescue "global"
connected_sockets[room] ||= [] of HTTP::WebSocket
connected_users[room] ||= Hash(String, String).new
redis.publish("users-bus", {action: "refresh", room: room}.to_json)
socket.on_message do |message|
msg = JSON.parse(message)
if msg["action"] == "login"
# Token must be a valid token and secret should be set properly
token = msg["token"].to_s rescue ""
begin
payload, header = JWT.decode(token, ENV["JWT_SECRET"], "HS512")
rescue
# If token is invalid return 401 error and close socket
env.response.status_code = 401
socket.close
next
end
connected_sockets[room].push socket
connected_users[room]["#{socket.object_id}"] = msg["from"].to_s
redis.publish("users-bus", {
action: "login",
login: msg["from"],
id: "#{socket.object_id}",
room: room,
}.to_json)
else
if msg["action"] == "refresh"
redis.publish("users-bus", {action: "refresh", room: room}.to_json)
end
# Only allow registered users
if connected_sockets[room].index socket
broadcast(msg, room, redis)
end
end
end
# When socket is closed, remove the socket and the user to variables
# Then send notification to all servers via redis
socket.on_close do |_|
redis.publish("users-bus", {action: "logout", id: socket.object_id.to_s, room: room}.to_json)
connected_users[room].reject!(socket.object_id.to_s)
connected_sockets[room].delete(socket)
end
end
Kemal.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment