Created
March 12, 2013 12:19
-
-
Save gauravsaini23/5142467 to your computer and use it in GitHub Desktop.
amqp implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'yaml' | |
###################### | |
## SET CONFIGURATION | |
##################### | |
ENV['JAVA_HOME'] = '/usr/lib/jvm/java-6-openjdk-i386/' | |
APP_CONFIG = YAML.load_file(File.expand_path("./../../../config/instances/hcl.yml", __FILE__))[ARGV[0].to_s] | |
unless ARGV[0] | |
puts "Please pass on the environment param: development | test | production" | |
abort | |
end | |
################### | |
## REQUIRE FILES | |
################### | |
require 'amqp' | |
require 'em-websocket' | |
require 'json' | |
require 'evma_httpserver' | |
require "active_record" | |
require 'cgi' | |
require 'uri' | |
require 'logger' | |
require File.expand_path('../../import_classes.rb', __FILE__) | |
require File.expand_path('../../hbase_connection.rb', __FILE__) | |
require File.expand_path('../helper.rb', __FILE__) | |
require File.expand_path('../socket_manager', __FILE__) | |
require File.expand_path('../../../config/initializers/satorirecord_base', __FILE__) | |
require File.expand_path('../../../app/models/chat_user', __FILE__) | |
require File.expand_path('../../../app/models/chathistory', __FILE__) | |
require File.expand_path('../../../app/models/follower', __FILE__) | |
require File.expand_path('../../../app/models/securitygroups', __FILE__) | |
require File.expand_path('../../../app/models/permission_group', __FILE__) | |
require File.expand_path('../../../app/models/permission_group_user', __FILE__) | |
require File.expand_path('../../../app/models/live_page', __FILE__) | |
require File.expand_path('../../../app/models/community', __FILE__) | |
require File.expand_path('../../../app/models/community_permission', __FILE__) | |
require File.expand_path('../../../app/models/workspace_permission', __FILE__) | |
require File.expand_path('../../../app/models/htable_connection_detail', __FILE__) | |
log_file = File.open(File.expand_path('../../../log/chat_server.log', __FILE__), "a") | |
logger = Logger.new log_file | |
## can not load existing user model because it requires other classes like ajax_ful_rater which further requires action view | |
class User < ActiveRecord::Base | |
end | |
########################## | |
## ESTABLISH CONNECTIONS | |
########################## | |
ActiveRecord::Base.establish_connection(YAML.load_file(File.expand_path("./../../../config/database.yml", __FILE__))[ARGV[0].to_s]) | |
HbaseConnection::Base.new(ARGV[0].to_s) | |
# start the run loop | |
EventMachine.run do | |
connection = AMQP.connect( | |
:host => APP_CONFIG[:webservers][:chat][:rabbitmq][:host], | |
:port => APP_CONFIG[:webservers][:chat][:rabbitmq][:port], | |
:user => APP_CONFIG[:webservers][:chat][:rabbitmq][:username], | |
:password => APP_CONFIG[:webservers][:chat][:rabbitmq][:password] | |
) | |
channel = AMQP::Channel.new(connection) | |
socket_manager = SocketManager.new | |
puts "---- Server started in #{ARGV[0]} mode ----" | |
EventMachine::WebSocket.start(:host => APP_CONFIG[:webservers][:chat][:host], :port => APP_CONFIG[:webservers][:chat][:port]) do |ws| | |
timer = nil | |
count = 0 | |
ws.onopen do | |
logger << "Connection established : " + ws.inspect | |
username = ws.request["query"]["username"] | |
ChatUser.update_page(username, CGI.unescapeHTML(ws.request["query"]["page"])) if ChatUser.should_record_page?(ws.request["query"]["page"]) | |
SocketManager.new.add_socket(username, ws) | |
timer = EM.add_periodic_timer(20) do | |
#send a ping request | |
ws.send JSON.generate({type: "PING", username: username}) | |
#if count is greater than 5 log the user out and mark him as offline | |
if count > 5 | |
ChatUser.remove_page(username, CGI.unescapeHTML(ws.request["query"]["page"])) | |
SocketManager.new.remove_socket(username, ws) | |
unless SocketManager.new.any_socket_exists?(username) | |
ChatUser.update_status(username, false) | |
ChatUser.remove_all_pages(username) | |
exchange = channel.direct("") | |
full_name = Helper.get_full_name(username) | |
Helper.get_users_to_send(username).each do |name| | |
status = {username: username, status: 'offline', type: 'STATUS', full_name: full_name} | |
exchange.publish(JSON.generate(status), :routing_key => name) | |
end | |
end | |
#cancel the timer | |
timer.cancel | |
end | |
#increment the count | |
count +=1 | |
end | |
end | |
ws.onmessage do |message| | |
req = Helper.parse(message) | |
exchange = channel.direct("") | |
if req[:type] == 'STATUS' | |
queue = channel.queue!(req[:username]) | |
queue.unsubscribe if queue.subscribed? | |
queue.subscribe {|payload| SocketManager.new.sockets[req[:username]].each {|socket| socket.send(payload) }} | |
AMQP::Consumer.new(channel, queue) | |
#add status of user into activerecord database, so that it can be used by main application | |
ChatUser.update_status(req[:username], (req[:status] == 'online' ? 1 : 0)) | |
#get the page on which this user is and send the status to the concerned user | |
req[:full_name] = Helper.get_full_name(req[:username]) | |
exchange.publish(JSON.generate(req), :routing_key => req[:username]) | |
req[:user_type] = "FOLLOWING" | |
json_following = JSON.generate(req) | |
req[:user_type] = "FOLLOWER" | |
json_follower = JSON.generate(req) | |
Follower.online_followers(req[:username]).each do |name| | |
exchange.publish(json_following, :routing_key => name.user_id) | |
end | |
Follower.online_followings(req[:username]).each do |name| | |
exchange.publish(json_follower, :routing_key => name.user_id) | |
end | |
profile_of, community_of, workspace_of = ChatUser.on_pages_of(req[:username]) | |
req[:user_type] = "PROFILE_VIEWER" | |
json_profile_viewer = JSON.generate(req) | |
profile_of.each{|name| exchange.publish(json_profile_viewer, :routing_key => name)} | |
community_of.each do |comm, names| | |
req[:user_type] = "COMM_VIEWER" | |
req[:comm_name] = comm | |
json_comm_viewer = JSON.generate(req) | |
names.each{|name| exchange.publish(json_comm_viewer, :routing_key => name)} | |
end | |
workspace_of.each do |work| | |
req[:user_type] = "WORK_VIEWER" | |
req[:comm_name] = work[:comm_url] | |
req[:work_name] = work[:work_url] | |
json_comm_viewer = JSON.generate(req) | |
work[:users].each{|name| exchange.publish(json_comm_viewer, :routing_key => name)} | |
end | |
helper = Helper.new | |
helper.users_on_my_communities(req[:username]).each do |comm_name, names| | |
req[:user_type] = "COMM_ADMIN" | |
req[:comm_name] = comm_name | |
names.each{|name| exchange.publish(JSON.generate(req), :routing_key => name)} | |
end | |
helper.users_on_my_workspaces(req[:username]).each do |my_hash| | |
req[:user_type] = "WORK_ADMIN" | |
req[:comm_name] = my_hash[:comm_url] | |
req[:work_name] = my_hash[:work_url] | |
my_hash[:users].each{|name| exchange.publish(req.to_json, :routing_key => name)} | |
end | |
req.delete(:comm_name) | |
helper.get_community_members(req[:username]).each do |comm_id, names| | |
req[:user_type] = "COMM_MEM" | |
req[:comm_id] = comm_id | |
names.each{|name| exchange.publish(JSON.generate(req), :routing_key => name)} | |
end | |
elsif req[:type] == 'MESSAGE' | |
req[:message] = (URI.encode(CGI.escape_html(req[:message]))) | |
req[:row_key], req[:timestamp] = Chathistory.add_message(req) | |
req[:full_name] = Helper.get_full_name(req[:username]) | |
json = JSON.generate(req) | |
exchange.publish(json, :routing_key => req[:message_to]) | |
exchange.publish(json, :routing_key => req[:username]) | |
elsif req[:type] == 'HISTORY' | |
history = Chathistory.get_history_for_single_chat(req[:row_key]) | |
exchange.publish(JSON.generate({type: "HISTORY", history: history, row_key: req[:row_key]}), :routing_key => req[:username]) | |
elsif req[:type] == 'PONG' | |
count -= 1 | |
end | |
end | |
ws.onclose do | |
username = ws.request["query"]["username"] | |
page = CGI.unescapeHTML(ws.request["query"]["page"]) | |
exchange = channel.direct("") | |
if match = page.match(/\/profile\/(\S*)\/updates/) | |
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'profile'}), :routing_key => match.values_at(1)[0]) | |
elsif match = page.match(/\/communities\/(\S*)($|\/)/) | |
(PermissionGroupUser.joins(:permission_group) | |
.where("permission_groups.permission_group = '#{match.values_at(1)[0]} owners'") | |
.select("user_name") | |
.collect(&:user_name).uniq - [username]).each do |user_name| | |
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'community', community: match.values_at(1)[0]}), :routing_key => user_name) | |
end | |
end | |
sm= SocketManager.new | |
sm.remove_socket(username, ws) | |
EM.add_timer(5) do | |
#if there is no socket open with this page, remove the page from user's account | |
ChatUser.remove_page(username, page) unless sm.socket_with_page_exists?(username, page) | |
end | |
#if lost the connection for more than 10 seconds mark the user as offline and remove the sockets else do nothing | |
EM.add_timer(10) do | |
# if there is no socket open for this user mark the status as offline | |
unless sm.any_socket_exists?(username) | |
#add status of user into activerecord database, so that it can be used by main application | |
ChatUser.remove_all_pages(username) | |
ChatUser.update_status(username, false) | |
json_status = JSON.generate({username: username, status: 'offline', type: 'STATUS', full_name: Helper.get_full_name(username)}) | |
Helper.get_users_to_send(username).each do |name| | |
exchange.publish(json_status, :routing_key => name) | |
end | |
end | |
end | |
end | |
ws.onerror do |error| | |
puts "error called ---- " + error.inspect | |
puts error.backtrace | |
unless error.class == EventMachine::WebSocket::HandshakeError | |
username = ws.request["query"]["username"] | |
page = CGI.unescapeHTML(ws.request["query"]["page"]) | |
exchange = channel.direct("") | |
if match = page.match(/\/profile\/(\S*)\/updates/) | |
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'profile'}), :routing_key => match.values_at(1)[0]) | |
elsif match = page.match(/\/communities\/(\S*)($|\/)/) | |
(PermissionGroupUser.joins(:permission_group) | |
.where("permission_groups.permission_group = '#{match.values_at(1)[0]} owners'") | |
.select("user_name") | |
.collect(&:user_name).uniq - [username]).each do |user_name| | |
exchange.publish(JSON.generate({username: username, type: 'OFF_PAGE', page: 'community', community: match.values_at(1)[0]}), :routing_key => user_name) | |
end | |
end | |
ChatUser.remove_page(username, page) | |
SocketManager.new.remove_socket(username, ws) | |
#if lost the connection for more than 10 seconds mark the user as offline and remove the sockets else do nothing | |
EM.add_timer(10) do | |
# if there is no socket open for this user mark the status as offline | |
unless SocketManager.new.any_socket_exists?(username) | |
#add status of user into activerecord database, so that it can be used by main application | |
ChatUser.remove_all_pages(username) | |
ChatUser.update_status(username, false) | |
exchange = channel.direct("") | |
Helper.get_users_to_send(username).each do |name| | |
status = {username: username, status: 'offline', type: 'STATUS'} | |
exchange.publish(JSON.generate(status), :routing_key => name) | |
end | |
end | |
end | |
end | |
end | |
end | |
#set all users to offline when eventmachine stops | |
Signal.trap("INT") do | |
ChatUser.update_all(online: false) | |
SocketManager.new.sockets.values.flatten.each{|sock| sock.send(JSON.generate({type: 'SERVER_DOWN'}))} | |
EventMachine.stop | |
end | |
Signal.trap("TERM") do | |
ChatUser.update_all(online: false) | |
SocketManager.new.sockets.values.flatten.each{|sock| sock.send(JSON.generate({type: 'SERVER_DOWN'}))} | |
EventMachine.stop | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment