Last active
February 15, 2016 07:02
-
-
Save Asmod4n/241c84ef31df921b31e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'json' | |
require 'active_support/core_ext/numeric/bytes' | |
require 'delegate' | |
require 'forwardable' | |
require 'time' | |
require 'celluloid/io' | |
require 'websocket/driver' | |
require 'webmachine' | |
require 'webmachine/adapters/rack' | |
require 'celluloid/autostart' | |
module Wamp | |
module Consts | |
ABORT_MSG = '[3,{},"wamp.error.system_shutdown"]' | |
GOODBYE_MSG = 'wamp.error.system_shutdown'.freeze | |
GOODBYE_REPLY_MSG = 'wamp.error.goodbye_and_out'.freeze | |
WEBSOCKET_PROTOS = ['wamp.2.json'.freeze].freeze | |
REALM = 'com.example'.freeze | |
ROLES = {roles: {broker: {}.freeze, dealer: {}.freeze}.freeze}.freeze | |
TIMEOUT = (5).freeze | |
URI_MATCH = %r{^(([0-9a-z_-]{2,}\.)|\.)*([0-9a-z_-]{2,})?$}i.freeze | |
TWO_POW_53 = (2**53).freeze | |
ONE_MEGABYTE = (1.megabyte).freeze | |
EMPTY_DICT = {}.freeze | |
EMPTY_LIST = [].freeze | |
HTTP_11 = 'HTTP/1.1'.freeze | |
CRLF = "\r\n".freeze | |
RACK_URL_SCHEME = 'rack.url_scheme'.freeze | |
HTTP = 'http'.freeze | |
BUFFER_SIZE = (16384).freeze | |
end | |
end | |
module Wamp | |
class Status < ::Delegator | |
REASON = { | |
1 => 'HELLO', | |
2 => 'WELCOME', | |
3 => 'ABORT', | |
4 => 'CHALLENGE', | |
5 => 'AUTHENTICATE', | |
6 => 'GOODBYE', | |
7 => 'HEARTBEAT', | |
8 => 'ERROR', | |
16 => 'PUBLISH', | |
17 => 'PUBLISHED', | |
32 => 'SUBSCRIBE', | |
33 => 'SUBSCRIBED', | |
34 => 'UNSUBSCRIBE', | |
35 => 'UNSUBSCRIBED', | |
36 => 'EVENT', | |
48 => 'CALL', | |
49 => 'CANCEL', | |
50 => 'RESULT', | |
64 => 'REGISTER', | |
65 => 'REGISTERED', | |
66 => 'UNREGISTER', | |
67 => 'UNREGISTERED', | |
68 => 'INVOCATION', | |
69 => 'INTERRUPT', | |
70 => 'YIELD' | |
}.each { |_, v| v.freeze }.freeze | |
class << self | |
# Coerces given value to Status. | |
# | |
# @example | |
# | |
# Status.coerce("hello") # => Status.new(1) | |
# Status.coerce(:hello) # => Status.new(1) | |
# Status.coerce(1.0) # => Status.new(1) | |
# Status.coerce(true) # => raises ArgumentError | |
# | |
# @raise [ArgumentError] if coercion is impossible | |
# @param [String, Symbol, Numeric] object | |
# @return [Status] | |
def coerce(object) | |
code = case object | |
when String then SYMBOL_CODES[symbolize object] | |
when Symbol then SYMBOL_CODES[object] | |
when Numeric then object.to_i | |
else nil | |
end | |
return new code if code | |
fail ArgumentError, "Can't coerce #{object.class}(#{object}) to #{self}" | |
end | |
alias_method :[], :coerce | |
private | |
# Symbolizes given string | |
# | |
# @example | |
# | |
# symbolize "HELLO" # => :hello | |
# | |
# @param [#to_s] str | |
# @return [Symbol] | |
def symbolize(str) | |
str.to_s.downcase.to_sym | |
end | |
end | |
# Code to Symbol map | |
# | |
# @example Usage | |
# | |
# SYMBOLS[1] # => :hello | |
# SYMBOLS[2] # => :welcome | |
# | |
# @return [Hash<Fixnum => Symbol>] | |
SYMBOLS = Hash[REASON.map { |k, v| [k, symbolize(v)] }].freeze | |
# Reversed {SYMBOLS} map. | |
# | |
# @example Usage | |
# | |
# SYMBOL_CODES[:hello] # => 1 | |
# SYMBOL_CODES[:welcome] # => 2 | |
# | |
# @return [Hash<Symbol => Fixnum>] | |
SYMBOL_CODES = Hash[SYMBOLS.map { |k, v| [v, k] }].freeze | |
# Status code | |
# | |
# @return [Fixnum] | |
attr_reader :code | |
if RUBY_VERSION < '1.9.0' | |
# @param [#to_i] code | |
def initialize(code) | |
super __setobj__ code | |
end | |
end | |
# Status message | |
# | |
# @return [nil] unless code is well-known (see REASON) | |
# @return [String] | |
def reason | |
REASON[code] | |
end | |
# Symbolized {#reason} | |
# | |
# @return [nil] unless code is well-known (see REASON) | |
# @return [Symbol] | |
def symbolize | |
SYMBOLS[code] | |
end | |
# Printable version of HTTP Status, surrounded by quote marks, | |
# with special characters escaped. | |
# | |
# (see String#inspect) | |
def inspect | |
"#{code} #{reason}".inspect | |
end | |
SYMBOLS.each do |code, symbol| | |
class_eval <<-RUBY, __FILE__, __LINE__ | |
def #{symbol}? # def hello? | |
#{code} == code # 1 == code | |
end # end | |
RUBY | |
end | |
def __setobj__(obj) | |
@code = obj.to_i | |
end | |
def __getobj__ | |
@code | |
end | |
end | |
end | |
module Wamp | |
class StaticResource < Webmachine::Resource | |
include Consts | |
def last_modified | |
File.mtime(__FILE__) | |
end | |
def to_html | |
<<-HTML | |
<!doctype html> | |
<html lang="en"> | |
<head> | |
<meta charset="utf-8"> | |
<title>Reel WebSockets time server example</title> | |
<style> | |
body { | |
font-family: "HelveticaNeue-Light", "Helvetica Neue Light", "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif; | |
font-weight: 300; | |
text-align: center; | |
} | |
#content { | |
width: 800px; | |
margin: 0 auto; | |
background: #EEEEEE; | |
padding: 1em; | |
} | |
</style> | |
<!-- github isn't a cdn, don't use this in production code --> | |
<script src="https://raw.githubusercontent.com/KSDaemon/wampy.js/dev/build/wampy-all.min.js"></script> | |
<script> | |
var ws = new Wampy('/stream', { | |
realm: '#{REALM}', | |
onConnect: function() { | |
ws.subscribe('#{REALM}.time_update', function(data) { | |
document.getElementById('current-time').innerHTML = new Date(data * 1000.0).toISOString(); | |
}); | |
} | |
}); | |
</script> | |
</head> | |
<body> | |
<div id="content"> | |
<h1>Time Server Example</h1> | |
<div>The time is now: <span id="current-time">...</span></div> | |
</div> | |
</body> | |
</html> | |
HTML | |
end | |
end | |
end | |
module Wamp | |
class WebSocketHandler | |
extend Forwardable | |
include Celluloid::Logger | |
include Celluloid | |
include Celluloid::Notifications | |
include Celluloid::FSM | |
include Consts | |
class Call < Struct.new(:peer_id, :options, :procedure, :arguments, :argumentskw) ; end | |
def_delegators :@prng, :rand | |
def_delegators :@driver, :parse, :protocol | |
execute_block_on_receiver :initialize | |
finalizer :disconnect | |
default_state :idle | |
state :idle, to: [:send_website, :websocket_start, :disconnect] | |
state :send_website, to: [:disconnect] | |
state :websocket_start, to: [:websocket_connected, :disconnect] | |
state :websocket_connected, to: [:wamp_abort, :wamp_welcome, :websocket_close, :disconnect] | |
state :wamp_abort, to: [:websocket_close, :disconnect] | |
state :wamp_welcome, to: [:wamp_connected, :websocket_close, :disconnect] | |
state :wamp_connected, to: [:wamp_goodbye, :wamp_goodbye_reply, :websocket_close, :disconnect] | |
state :websocket_close, to: [:disconnect] | |
state :disconnect, to: [:disconnected] | |
state :disconnected | |
state :send_website do | |
debug state | |
begin | |
rack_adapter = Webmachine::Adapters::Rack.new(Webmachine.application) | |
@driver.env[RACK_URL_SCHEME] = HTTP | |
status, headers, body = rack_adapter.call(@driver.env) | |
response = "#{HTTP_11} #{status}#{CRLF}" | |
headers.each do |k, v| | |
response << "#{k}: #{v}#{CRLF}" | |
end | |
response << CRLF | |
@socket.write response << body.join | |
transition :disconnect | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :websocket_start do | |
debug state | |
begin | |
@driver.start | |
transition :websocket_connected | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :websocket_connected do | |
debug state | |
async.send_ping | |
end | |
state :wamp_abort do | |
debug state | |
begin | |
@driver.frame(ABORT_MSG) | |
transition :websocket_close | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :wamp_welcome do | |
debug state | |
begin | |
@peer_id = generate_id | |
msg = JSON.generate([Status[:welcome], @peer_id, @roles]) | |
debug msg | |
@driver.frame(msg) | |
transition :wamp_connected | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :wamp_connected do | |
debug state | |
async.heartbeat | |
end | |
state :wamp_goodbye do | |
debug state | |
begin | |
@driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_MSG])) | |
transition :websocket_close | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :wamp_goodbye_reply do | |
debug state | |
begin | |
@driver.frame(JSON.generate([Status[:goodbye], EMPTY_DICT, GOODBYE_REPLY_MSG])) | |
transition :websocket_close | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :websocket_close do | |
debug state | |
begin | |
@driver.close | |
transition :disconnect | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
end | |
state :disconnect do | |
debug state | |
if @socket | |
@socket.close unless @socket.closed? | |
end | |
transition :disconnected | |
end | |
state :disconnected do | |
debug state | |
terminate rescue nil | |
end | |
def initialize(socket, options = {}, &callback) | |
super() | |
@out_hb_number, @inc_hb_number, @peer_id = 0, 0, 0 | |
@latency = 0.0 | |
@subscriptions = {} | |
@prng = Random.new | |
@socket = socket | |
@driver = WebSocket::Driver.server(@socket, options) | |
@driver.on(:connect) do | |
if WebSocket::Driver.websocket? @driver.env | |
transition :websocket_start | |
else | |
transition :send_website | |
end | |
end | |
@driver.on(:open) do | |
unless protocol | |
transition :websocket_close | |
end | |
end | |
@driver.on(:error) {|e| error e} | |
@driver.on(:close) {|e| debug e; disconnect} | |
@driver.on(:message) {|e| parse_message(e.data)} | |
@callback = callback | |
if @callback | |
@roles = ROLES | |
else | |
@roles = {roles: {broker: EMPTY_DICT}.freeze}.freeze | |
end | |
end | |
def wampify_message(message) | |
case message | |
when Array then message | |
when String, Numeric, TrueClass, FalseClass, NilClass, Symbol then [message] | |
when Hash then [EMPTY_LIST, message] | |
when IO, StringIO | |
message.rewind if message.respond_to?(:rewind) | |
[message.read] | |
else | |
if message.respond_to?(:to_ary) | |
message.to_ary | |
elsif message.respond_to?(:to_hash) | |
[EMPTY_LIST, message.to_hash] | |
elsif message.respond_to?(:to_h) | |
[EMPTY_LIST, message.to_h] | |
elsif message.respond_to?(:to_a) | |
message.to_a | |
elsif message.respond_to?(:to_io) | |
message.to_io.rewind if message.to_io.respond_to?(:rewind) | |
[message.to_io.read] | |
elsif (io_message = IO.try_convert(message)) | |
io_message.rewind if io_message.respond_to?(:rewind) | |
[io_message.read] | |
else | |
fail ArgumentError, "Don't know how to publish #{message.class}" | |
end | |
end | |
end | |
def generate_id | |
rand(TWO_POW_53) | |
end | |
def parse_message(msg) | |
debug "parse: #{msg}" | |
message = JSON.parse(msg) | |
if message.is_a?(Array) && message[0].is_a?(Numeric) && message.length >= 2 | |
case state | |
when :websocket_connected | |
if Status[message[0]].hello? && message[1] == REALM | |
transition :wamp_welcome | |
else | |
transition :wamp_abort | |
end | |
when :wamp_connected | |
# Legitimate message handling happens here, the rest is just error management. | |
case Status[message[0]].symbolize | |
when :hello | |
transition :wamp_goodbye | |
when :subscribe | |
# message[3] is the topic uri | |
if message[3] =~ URI_MATCH && message[3].start_with?(REALM) | |
subscription_id = generate_id | |
@subscriptions[message[3]] ||= [] | |
@subscriptions[message[3]] << subscription_id | |
@subscriptions[subscription_id] = subscribe(message[3], :subscribe_handler) | |
write(JSON.generate([Status[:subscribed], Integer(message[1]), subscription_id])) | |
else | |
transition :wamp_goodbye | |
end | |
when :unsubscribe | |
# message[2] is the subscription_id from above | |
if message[2] | |
if unsubscribe_handler(Integer(message[2])) | |
# message[1] is a randomly generated id from the client | |
write(JSON.generate([Status[:unsubscribed], Integer(message[1])])) | |
else | |
transition :wamp_goodbye | |
end | |
else | |
transition :wamp_goodbye | |
end | |
when :heartbeat | |
if message[2] | |
@inc_hb_number = Integer(message[2]) | |
else | |
transition :wamp_goodbye | |
end | |
when :goodbye | |
case message[2] | |
when GOODBYE_REPLY_MSG | |
transition :websocket_close | |
when GOODBYE_MSG | |
transition :wamp_goodbye_reply | |
else | |
transition :wamp_goodbye | |
end | |
when :call | |
if @callback | |
if message[3] =~ URI_MATCH && message[3].start_with?(REALM) | |
result = @callback.call(Call.new(@peer_id, message[2], message[3], message[4], message[5])) | |
base_res = [Status[:result], Integer(message[1]), EMPTY_DICT] | |
write(JSON.generate(base_res << wampify_message(result))) | |
else | |
transition :wamp_goodbye | |
end | |
else | |
transition :wamp_goodbye | |
end | |
else | |
debug "wamp_connected_status: #{Status[message[0]].symbolize}" | |
transition :wamp_goodbye | |
end | |
else | |
debug state | |
transition :disconnect | |
end | |
else | |
disconnect | |
end | |
rescue JSON::ParserError, ArgumentError | |
disconnect | |
end | |
def subscribe_handler(topic, message) | |
if @subscriptions[topic] | |
msg = wampify_message(message) | |
@subscriptions[topic].each do |subscriber| | |
base_msg = [Status[:event], subscriber, generate_id, EMPTY_DICT] | |
write JSON.generate(base_msg << msg) | |
end | |
end | |
end | |
def unsubscribe_handler(subscription_id) | |
if subscription = @subscriptions.delete(subscription_id) | |
@subscriptions[subscription.pattern].delete(subscription_id) | |
if @subscriptions[subscription.pattern].length == 0 | |
@subscriptions.delete(subscription.pattern) | |
end | |
unsubscribe(subscription) | |
true | |
else | |
false | |
end | |
end | |
def write(msg = '', type = nil, code = nil) | |
debug "write: #{msg} #{type} #{code}" | |
@driver.frame(msg, type, code) | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
transition :disconnect | |
end | |
def disconnect | |
debug "disconnect" | |
case state | |
when :wamp_connected | |
transition :wamp_goodbye | |
when :websocket_connected | |
transition :wamp_abort | |
when :disconnect, :disconnected | |
else | |
debug "disconnect: #{state}" | |
transition :disconnect | |
end | |
end | |
private | |
def heartbeat | |
every(15) { | |
async.write(JSON.generate([Status[:heartbeat], @inc_hb_number, @out_hb_number += 1])) | |
} | |
end | |
def send_ping | |
every(5) { | |
async.ping | |
} | |
end | |
def ping | |
id = "#{generate_id}" | |
info "Sending ping: #{id}" | |
start = Time.now.to_f | |
@driver.ping(id) do | |
@latency = Time.now.to_f - start | |
info 'Recieved ping' | |
info "Latency to #{@socket.peeraddr}: #{@latency}" | |
end | |
end | |
end | |
end | |
class Server | |
include Celluloid::Logger | |
include Celluloid::IO | |
include Celluloid::Notifications | |
include Wamp::Consts | |
execute_block_on_receiver :initialize | |
finalizer :shutdown | |
trap_exit :crash_notifier | |
def initialize(&callback) | |
@callback = callback | |
@server = TCPServer.new('0.0.0.0', 8080) | |
async.time_notifier | |
async.run | |
end | |
def handle_connection(socket) | |
websocket = Wamp::WebSocketHandler.new(socket, max_length: ONE_MEGABYTE, protocols: WEBSOCKET_PROTOS, &@callback) | |
while websocket.alive? do | |
begin | |
buffer = socket.readpartial(BUFFER_SIZE) | |
if websocket.alive? | |
websocket.parse(buffer) | |
else | |
break | |
end | |
rescue IOError, Errno::EPIPE, Errno::ECONNRESET | |
break | |
rescue | |
break | |
end | |
end | |
ensure | |
websocket.disconnect rescue nil | |
end | |
def crash_notifier(actor, reason) | |
debug "Oh no! #{actor.inspect} has died because of a #{reason.class}" | |
end | |
def shutdown | |
@server.close if @server | |
end | |
private | |
def time_notifier | |
every(1.0) { | |
publish("#{REALM}.time_update", Time.now.to_f) | |
} | |
end | |
def run | |
loop { async.handle_connection(@server.accept) } | |
end | |
end | |
Webmachine.application.routes do | |
add_route ['*'], Wamp::StaticResource | |
end | |
Server.run do |call| | |
'world' | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment