# in one terminal
$ lein deps
$ lein run -m server
# in another terminal
$ curl -i http://localhost:8080/sync
$ curl -i http://localhost:8080/poll
$ curl -i http://localhost:8080/stream
$ ./wsclient ws://localhost:8080/websocket?name=bob
=> hi
=> quit
Created
November 1, 2010 05:57
-
-
Save mmcgrana/657694 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject try-jetty-async "0.0.0" | |
:dependencies [[ring/ring-jetty-async-adapter "0.3.3-SNAPSHOT"]]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns server | |
(:use ring.adapter.jetty-async | |
ring.middleware.params | |
ring.middleware.keyword-params)) | |
(defn sleep [i] | |
(Thread/sleep i)) | |
(defn- sync-handler [req] | |
(println "handling sync request") | |
{:status 200 | |
:headers {"Content-Type" "text/plain"} | |
:body "hello"}) | |
(defn- poll-handler [req] | |
(println "receiving poll request") | |
{:async :http | |
:reactor | |
(fn [send] | |
(sleep 4000) | |
(println "returning poll request") | |
(send {:type :status :data 200}) | |
(send {:type :headers :data {"Content-Type" "text/plain"}}) | |
(send {:type :chunk :data "...hello"}) | |
(send {:type :close}))}) | |
(defn- stream-handler [req] | |
(println "receiving stream request") | |
{:async :http | |
:reactor | |
(fn [send] | |
(println "heading stream request") | |
(send {:type :status :data 200}) | |
(send {:type :headers :data {"Content-Type" "text/plain"}}) | |
(dotimes [i 5] | |
(sleep 1000) | |
(println "chunking stream request") | |
(send {:type :chunk :data (str (nth "hello" i))})) | |
(send {:type :close}))}) | |
(defn- websocket-handler [req] | |
(println "receiving websocket request") | |
(let [name (:name (:params req))] | |
{:async :websocket | |
:reactor | |
(fn [send] | |
(fn [{:keys [type data]}] | |
(case type | |
:connect | |
(println "connect!") | |
:message | |
(do | |
(println (format "message! (%s) %s" name data)) | |
(when (= "quit" data) | |
(send {:type :message :data "goodbye"}) | |
(send {:type :disconnect}))) | |
:disconnect | |
(println "disconnect!"))))})) | |
(defn handler [{:keys [uri] :as req}] | |
(let [h (case uri | |
"/sync" sync-handler | |
"/poll" poll-handler | |
"/stream" stream-handler | |
"/websocket" websocket-handler)] | |
(h req))) | |
(def app | |
(-> handler | |
wrap-keyword-params | |
wrap-params)) | |
(defn -main [& args] | |
(run-jetty-async app {:port 8080})) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# Original: http://github.com/gimite/web-socket-ruby | |
# Copyright: Hiroshi Ichikawa <http://gimite.net/en/> | |
# Lincense: New BSD Lincense | |
# Reference: http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol | |
require "socket" | |
require "uri" | |
require "digest/md5" | |
require "openssl" | |
class WebSocket | |
class << self | |
attr_accessor :debug | |
end | |
attr_accessor :socket | |
class Error < RuntimeError | |
end | |
def initialize(arg, params = {}) | |
if params[:server] # server | |
@server = params[:server] | |
@socket = arg | |
line = gets().chomp() | |
if !(line =~ /\AGET (\S+) HTTP\/1.1\z/n) | |
raise(WebSocket::Error, "invalid request: #{line}") | |
end | |
@path = $1 | |
read_header() | |
if @header["sec-websocket-key1"] && @header["sec-websocket-key2"] | |
@key3 = read(8) | |
else | |
# Old Draft 75 protocol | |
@key3 = nil | |
end | |
if [email protected]_origin?(self.origin) | |
raise(WebSocket::Error, | |
("Unaccepted origin: %s (server.accepted_domains = %p)\n\n" + | |
"To accept this origin, write e.g. \n" + | |
" WebSocketServer.new(..., :accepted_domains => [%p]), or\n" + | |
" WebSocketServer.new(..., :accepted_domains => [\"*\"])\n") % | |
[self.origin, @server.accepted_domains, @server.origin_to_domain(self.origin)]) | |
end | |
@handshaked = false | |
else # client | |
uri = arg.is_a?(String) ? URI.parse(arg) : arg | |
if uri.scheme == "ws" | |
default_port = 80 | |
elsif uri.scheme = "wss" | |
default_port = 443 | |
else | |
raise(WebSocket::Error, "unsupported scheme: #{uri.scheme}") | |
end | |
@path = (uri.path.empty? ? "/" : uri.path) + (uri.query ? "?" + uri.query : "") | |
host = uri.host + (uri.port == default_port ? "" : ":#{uri.port}") | |
origin = params[:origin] || "http://#{uri.host}" | |
key1 = generate_key() | |
key2 = generate_key() | |
key3 = generate_key3() | |
socket = TCPSocket.new(uri.host, uri.port || default_port) | |
if uri.scheme == "ws" | |
@socket = socket | |
else | |
@socket = ssl_handshake(socket) | |
end | |
write( | |
"GET #{@path} HTTP/1.1\r\n" + | |
"Upgrade: WebSocket\r\n" + | |
"Connection: Upgrade\r\n" + | |
"Host: #{host}\r\n" + | |
"Origin: #{origin}\r\n" + | |
"Sec-WebSocket-Key1: #{key1}\r\n" + | |
"Sec-WebSocket-Key2: #{key2}\r\n" + | |
"\r\n" + | |
"#{key3}") | |
flush() | |
line = gets().chomp() | |
raise(WebSocket::Error, "bad response: #{line}") if !(line =~ /\AHTTP\/1.1 101 /n) | |
read_header() | |
if (@header["sec-websocket-origin"] || "").downcase() != origin.downcase() | |
raise(WebSocket::Error, | |
"origin doesn't match: '#{@header["sec-websocket-origin"]}' != '#{origin}'") | |
end | |
reply_digest = read(16) | |
expected_digest = security_digest(key1, key2, key3) | |
if reply_digest != expected_digest | |
raise(WebSocket::Error, | |
"security digest doesn't match: %p != %p" % [reply_digest, expected_digest]) | |
end | |
@handshaked = true | |
end | |
@received = [] | |
@buffer = "" | |
@closing_started = false | |
end | |
attr_reader(:server, :header, :path) | |
def handshake(status = nil, header = {}) | |
if @handshaked | |
raise(WebSocket::Error, "handshake has already been done") | |
end | |
status ||= "101 Web Socket Protocol Handshake" | |
sec_prefix = @key3 ? "Sec-" : "" | |
def_header = { | |
"#{sec_prefix}WebSocket-Origin" => self.origin, | |
"#{sec_prefix}WebSocket-Location" => self.location, | |
} | |
header = def_header.merge(header) | |
header_str = header.map(){ |k, v| "#{k}: #{v}\r\n" }.join("") | |
if @key3 | |
digest = security_digest( | |
@header["Sec-WebSocket-Key1"], @header["Sec-WebSocket-Key2"], @key3) | |
else | |
digest = "" | |
end | |
# Note that Upgrade and Connection must appear in this order. | |
write( | |
"HTTP/1.1 #{status}\r\n" + | |
"Upgrade: WebSocket\r\n" + | |
"Connection: Upgrade\r\n" + | |
"#{header_str}\r\n#{digest}") | |
flush() | |
@handshaked = true | |
end | |
def send(data) | |
if !@handshaked | |
raise(WebSocket::Error, "call WebSocket\#handshake first") | |
end | |
data = force_encoding(data.dup(), "ASCII-8BIT") | |
write("\x00#{data}\xff") | |
flush() | |
end | |
def receive() | |
if !@handshaked | |
raise(WebSocket::Error, "call WebSocket\#handshake first") | |
end | |
packet = gets("\xff") | |
return nil if !packet | |
if packet =~ /\A\x00(.*)\xff\z/nm | |
return force_encoding($1, "UTF-8") | |
elsif packet == "\xff" && read(1) == "\x00" # closing | |
if @server | |
@socket.close() | |
else | |
close() | |
end | |
return nil | |
else | |
raise(WebSocket::Error, "input must be either '\\x00...\\xff' or '\\xff\\x00'") | |
end | |
end | |
def tcp_socket | |
return @socket | |
end | |
def host | |
return @header["host"] | |
end | |
def origin | |
return @header["origin"] | |
end | |
def location | |
return "ws://#{self.host}#{@path}" | |
end | |
# Does closing handshake. | |
def close() | |
return if @closing_started | |
write("\xff\x00") | |
@socket.close() if !@server | |
@closing_started = true | |
end | |
def close_socket() | |
@socket.close() | |
end | |
private | |
NOISE_CHARS = ("\x21".."\x2f").to_a() + ("\x3a".."\x7e").to_a() | |
def read_header() | |
@header = {} | |
while line = gets() | |
line = line.chomp() | |
break if line.empty? | |
if !(line =~ /\A(\S+): (.*)\z/n) | |
raise(WebSocket::Error, "invalid request: #{line}") | |
end | |
@header[$1] = $2 | |
@header[$1.downcase()] = $2 | |
end | |
if !(@header["upgrade"] =~ /\AWebSocket\z/i) | |
raise(WebSocket::Error, "invalid Upgrade: " + @header["upgrade"]) | |
end | |
if !(@header["connection"] =~ /\AUpgrade\z/i) | |
raise(WebSocket::Error, "invalid Connection: " + @header["connection"]) | |
end | |
end | |
def gets(rs = $/) | |
line = @socket.gets(rs) | |
$stderr.printf("recv> %p\n", line) if WebSocket.debug | |
return line | |
end | |
def read(num_bytes) | |
str = @socket.read(num_bytes) | |
$stderr.printf("recv> %p\n", str) if WebSocket.debug | |
return str | |
end | |
def write(data) | |
if WebSocket.debug | |
data.scan(/\G(.*?(\n|\z))/n) do | |
$stderr.printf("send> %p\n", $&) if !$&.empty? | |
end | |
end | |
@socket.write(data) | |
end | |
def flush() | |
@socket.flush() | |
end | |
def security_digest(key1, key2, key3) | |
bytes1 = websocket_key_to_bytes(key1) | |
bytes2 = websocket_key_to_bytes(key2) | |
return Digest::MD5.digest(bytes1 + bytes2 + key3) | |
end | |
def generate_key() | |
spaces = 1 + rand(12) | |
max = 0xffffffff / spaces | |
number = rand(max + 1) | |
key = (number * spaces).to_s() | |
(1 + rand(12)).times() do | |
char = NOISE_CHARS[rand(NOISE_CHARS.size)] | |
pos = rand(key.size + 1) | |
key[pos...pos] = char | |
end | |
spaces.times() do | |
pos = 1 + rand(key.size - 1) | |
key[pos...pos] = " " | |
end | |
return key | |
end | |
def generate_key3() | |
return [rand(0x100000000)].pack("N") + [rand(0x100000000)].pack("N") | |
end | |
def websocket_key_to_bytes(key) | |
num = key.gsub(/[^\d]/n, "").to_i() / key.scan(/ /).size | |
return [num].pack("N") | |
end | |
def force_encoding(str, encoding) | |
if str.respond_to?(:force_encoding) | |
return str.force_encoding(encoding) | |
else | |
return str | |
end | |
end | |
def ssl_handshake(socket) | |
ssl_context = OpenSSL::SSL::SSLContext.new() | |
ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) | |
ssl_socket.sync_close = true | |
ssl_socket.connect() | |
return ssl_socket | |
end | |
end | |
class WebSocketServer | |
def initialize(params_or_uri, params = nil) | |
if params | |
uri = params_or_uri.is_a?(String) ? URI.parse(params_or_uri) : params_or_uri | |
params[:port] ||= uri.port | |
params[:accepted_domains] ||= [uri.host] | |
else | |
params = params_or_uri | |
end | |
@port = params[:port] || 80 | |
@accepted_domains = params[:accepted_domains] | |
if !@accepted_domains | |
raise(ArgumentError, "params[:accepted_domains] is required") | |
end | |
if params[:host] | |
@tcp_server = TCPServer.open(params[:host], @port) | |
else | |
@tcp_server = TCPServer.open(@port) | |
end | |
end | |
attr_reader(:tcp_server, :port, :accepted_domains) | |
def run(&block) | |
while true | |
Thread.start(accept()) do |s| | |
begin | |
ws = create_web_socket(s) | |
yield(ws) if ws | |
rescue => ex | |
print_backtrace(ex) | |
ensure | |
begin | |
ws.close_socket() if ws | |
rescue | |
end | |
end | |
end | |
end | |
end | |
def accept() | |
return @tcp_server.accept() | |
end | |
def accepted_origin?(origin) | |
domain = origin_to_domain(origin) | |
return @accepted_domains.any?(){ |d| File.fnmatch(d, domain) } | |
end | |
def origin_to_domain(origin) | |
if origin == "null" || origin == "file://" # local file | |
return "null" | |
else | |
return URI.parse(origin).host | |
end | |
end | |
def create_web_socket(socket) | |
ch = socket.getc() | |
if ch == ?< | |
# This is Flash socket policy file request, not an actual Web Socket connection. | |
send_flash_socket_policy_file(socket) | |
return nil | |
else | |
socket.ungetc(ch) | |
return WebSocket.new(socket, :server => self) | |
end | |
end | |
private | |
def print_backtrace(ex) | |
$stderr.printf("%s: %s (%p)\n", ex.backtrace[0], ex.message, ex.class) | |
for s in ex.backtrace[1..-1] | |
$stderr.printf(" %s\n", s) | |
end | |
end | |
# Handles Flash socket policy file request sent when web-socket-js is used: | |
# http://github.com/gimite/web-socket-js/tree/master | |
def send_flash_socket_policy_file(socket) | |
socket.puts('<?xml version="1.0"?>') | |
socket.puts('<!DOCTYPE cross-domain-policy SYSTEM ' + | |
'"http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd">') | |
socket.puts('<cross-domain-policy>') | |
for domain in @accepted_domains | |
next if domain == "file://" | |
socket.puts("<allow-access-from domain=\"#{domain}\" to-ports=\"#{@port}\"/>") | |
end | |
socket.puts('</cross-domain-policy>') | |
socket.close() | |
end | |
end | |
if (ARGV.length != 1) | |
$stderr.puts("Usage ./wsclient ws://<host>:<port>/<path>") | |
exit(1) | |
end | |
Thread.abort_on_exception = true | |
client = WebSocket.new(ARGV[0]) | |
puts("Connected") | |
Thread.new() do | |
while data = client.receive() | |
printf("Received: %p\n", data) | |
end | |
exit(0) | |
end | |
$stdin.each_line() do |line| | |
data = line.chomp() | |
client.send(data) | |
printf("Sent: %p\n", data) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment