Last active
October 6, 2017 10:07
-
-
Save solisoft/47a5a38e7363c1be619b08c714fec318 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "kemal" | |
require "jwt" | |
require "json" | |
require "yaml" | |
require "arangocr" | |
require "redis" | |
# Load .env file | |
env_data = YAML.parse File.read(".env.yml") rescue Hash(String, String).new | |
env_data.each { |k, v| ENV[k.to_s] = v.to_s } | |
# Create ArangoDB Client (Not used for now) | |
arango = Arango::Client.new(ENV["ARANGO_HOST"], ENV["ARANGO_USER"], ENV["ARANGO_PASSWORD"], ENV["ARANGO_DATABASE"]) | |
database = arango.database | |
# Connect to Redis and create subscribed channels | |
redis = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"]) | |
sub = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"]) | |
sub_users = Redis.new(ENV["REDIS_HOST"], ENV["REDIS_PORT"]) | |
# Don't server static files | |
serve_static false | |
# Initialize variables | |
connected_sockets = {} of String => Array(HTTP::WebSocket) | |
connected_users = {} of String => Hash(String, String) | |
foreign_users = {} of String => Hash(String, String) | |
# broadcast | |
# send a message to all connected servers via redis | |
def broadcast(data, room, redis) | |
redis.publish("msg-bus", {"room" => room, "data" => data}.to_json) | |
end | |
# list_of_users | |
# Send the list of users to all connected users for a specific room | |
def list_of_users(users, sockets) | |
sockets.each do |socket| | |
socket.send({action: "list", users: users}.to_json) | |
end | |
end | |
# launch subscribe listeners for users sync | |
spawn do | |
sub_users.subscribe("users-bus") do |on| | |
on.message do |channel, message| | |
message = JSON.parse(message.to_s) | |
p message | |
room = message["room"].to_s | |
foreign_users[room] ||= Hash(String, String).new | |
connected_users[room] ||= Hash(String, String).new | |
connected_sockets[room] ||= [] of HTTP::WebSocket | |
if message["action"] == "login" | |
foreign_users[room][message["id"].to_s] = message["login"].to_s | |
list_of_users(foreign_users[room].values.uniq, connected_sockets[room]) | |
end | |
if message["action"] == "logout" | |
foreign_users[room].reject!(message["id"].to_s) | |
list_of_users(foreign_users[room].values.uniq, connected_sockets[room]) | |
end | |
if message["action"] == "refresh" | |
foreign_users[room] = Hash(String, String).new | |
redis.publish("users-bus", {action: "list", room: room, users: connected_users[room]}.to_json) | |
end | |
if message["action"] == "list" | |
message["users"].each do |k, v| | |
foreign_users[room][k.to_s] = v.to_s | |
end | |
list_of_users(foreign_users[room].values.uniq, connected_sockets[room]) | |
end | |
end | |
end | |
end | |
# launch subscribe listeners for messqges | |
spawn do | |
sub.subscribe("msg-bus") do |on| | |
on.message do |channel, message| | |
message = JSON.parse(message.to_s) | |
room = message["room"].to_s | |
connected_sockets[room] ||= [] of HTTP::WebSocket | |
connected_sockets[room].each do |socket| | |
socket.send message["data"].to_json | |
end | |
end | |
end | |
end | |
# ws / | |
# manage multiple rooms via the query string | |
# client sample (create room r1): | |
# ws = new WebSocket('ws://localhost:3000/?room=r1'); | |
ws "/" do |socket, env| | |
room = env.params.query["room"] rescue "global" | |
connected_sockets[room] ||= [] of HTTP::WebSocket | |
connected_users[room] ||= Hash(String, String).new | |
redis.publish("users-bus", {action: "refresh", room: room}.to_json) | |
socket.on_message do |message| | |
msg = JSON.parse(message) | |
if msg["action"] == "login" | |
# Token must be a valid token and secret should be set properly | |
token = msg["token"].to_s rescue "" | |
begin | |
payload, header = JWT.decode(token, ENV["JWT_SECRET"], "HS512") | |
rescue | |
# If token is invalid return 401 error and close socket | |
env.response.status_code = 401 | |
socket.close | |
next | |
end | |
connected_sockets[room].push socket | |
connected_users[room]["#{socket.object_id}"] = msg["from"].to_s | |
redis.publish("users-bus", { | |
action: "login", | |
login: msg["from"], | |
id: "#{socket.object_id}", | |
room: room, | |
}.to_json) | |
else | |
if msg["action"] == "refresh" | |
redis.publish("users-bus", {action: "refresh", room: room}.to_json) | |
end | |
# Only allow registered users | |
if connected_sockets[room].index socket | |
broadcast(msg, room, redis) | |
end | |
end | |
end | |
# When socket is closed, remove the socket and the user to variables | |
# Then send notification to all servers via redis | |
socket.on_close do |_| | |
redis.publish("users-bus", {action: "logout", id: socket.object_id.to_s, room: room}.to_json) | |
connected_users[room].reject!(socket.object_id.to_s) | |
connected_sockets[room].delete(socket) | |
end | |
end | |
Kemal.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment