Skip to content

Instantly share code, notes, and snippets.

@mitchellrj
Created April 23, 2012 14:55
Show Gist options
  • Save mitchellrj/2471448 to your computer and use it in GitHub Desktop.
Save mitchellrj/2471448 to your computer and use it in GitHub Desktop.
WebSocket response object for WebOb with echoserver example
class EchoWebSocketResponse(BaseWebSocketResponse):
def after_open(self):
self.receive_until_closed()
def handle_text(self, text):
self.send_text(text)
def handle_request(self, request):
websocket_version = 0
try:
websocket_version = int(request.headers.get('Sec-WebSocket-Version', '0'))
except (TypeError, ValueError):
pass
websocket = (
request.headers.get('Upgrade', '').lower() == 'websocket' and
websocket_version >= 6
)
if websocket:
response = EchoWebSocketResponse(request, stream)
else:
response = request.ResponseClass()
return response
from base64 import b64encode
from hashlib import sha1
import json
import random
import socket
import struct
MAGIC_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
class BaseWebSocketResponse(object):
# Max size of packets in bytes. Default: maximum allowable by protocol
max_packet_size = 18446744073709551616L
def __init__(self, request):
key = request.headers.get('Sec-WebSocket-Key', '')
accept = b64encode(sha1(key + MAGIC_STRING).digest())
self.request = request
self.response = self.request.ResponseClass()
self.status = '101 Switching Protocols'
self.headers = [('Connection', 'Upgrade'),
('Upgrade', 'websocket'),
('Sec-WebSocket-Accept', accept),
('Content-Length', '0')] # Content-Length is to fool Paste
self.closed = False
self._closing = False
self._close_sent = False
def __call__(self, environ, start_response):
self._sock = self._get_sock_from_environ(environ)
self.write_chunk = start_response(self.status, self.headers)
self.write_chunk('')
self.after_open()
self.close('End of data.')
return ['']
def after_open(self):
pass
def after_close(self):
pass
def _get_sock_from_environ(self, environ):
# Only tested in Paste
return environ['wsgi.input'].file
def _get_bytes(self, l=-1):
byte = ''
try:
byte = self._sock.read(l)
except socket.error:
self.closed = True
return byte
def _mask(self, data, mask):
result = []
for i in range(len(data)):
result.append(chr(ord(data[i]) ^ ord(mask[i % 4])))
return ''.join(result)
def handle_binary(self, bitstring):
pass
def handle_text(self, text):
pass
def handle_unknown(self, opcode, data):
err = "Unrecognised frame opcode: %d" % opcode
self.close(err)
raise RuntimeError("Unrecognised frame opcode: %d" % opcode)
def _handle_ping(self, data):
self.send_pong(data)
def handle_pong(self, data):
pass
def receive_packet(self):
continuing = True
while continuing:
packet = ''
head_byte1 = self._get_bytes(1)
if head_byte1 == '':
# socket closed
break
head_byte1 = ord(head_byte1)
fin = (head_byte1 >> 7)& 1
# rsv1 = head_byte1 & 64
# rsv2 = head_byte1 & 32
# rsv3 = head_byte1 & 16
#
# assert rsv1 == 0x00
# assert rsv2 == 0x00
# assert rsv3 == 0x00
opcode = head_byte1 & 15
control_frame = opcode & 8
continuing = False
if opcode == 0x00: # continuation
continuing = True
elif opcode == 0x01: # text
type = 'text'
elif opcode == 0x02: # binary
type = 'binary'
elif opcode == 0x08: # connection close
type = 'close'
elif opcode == 0x09: # ping
type = 'ping'
elif opcode == 0x0A: # pong
type = 'pong'
else: # unknown
type = opcode
head_byte2 = self._get_bytes(1)
if head_byte2 == '':
# socket closed
break
head_byte2 = ord(head_byte2)
mask = (head_byte2 >> 7) & 1
payload_length = head_byte2 & 127
if payload_length == 126:
extended_length_bytes = self._get_bytes(2)
payload_length = struct.unpack('!H', extended_length_bytes)
elif payload_length == 127:
extended_length_bytes = self._get_bytes(8)
payload_length = struct.unpack('!Q', extended_length_bytes)
masking_key = ''
if mask:
masking_key = self._get_bytes(4)
payload = self._get_bytes(payload_length)
if mask:
payload = self._mask(payload, masking_key)
if not control_frame:
# Control frames may appear in the middle of data
packet += payload
if continuing and fin:
continuing = False
if not continuing:
if not control_frame:
self.handle_packet(type, packet)
else:
self.handle_packet(type, payload)
packet = ''
def handle_packet(self, type, data):
if type=='text':
self.handle_text(data)
elif type=='binary':
self.handle_binary(data)
elif type=='ping':
self._handle_ping(data)
elif type=='pong':
self.handle_pong(data)
elif type=='close':
self._handle_close(data)
else:
self.handle_unknown(type, data)
def _create_packet(self, opcode, data, mask, final=False):
fin = int(final)
rsv1 = rsv2 = rsv3 = 0
if not fin:
opcode = 0x00
packet = chr(
(fin << 7) | \
(rsv1 << 6) | \
(rsv2 << 5) | \
(rsv3 << 4) | \
opcode
)
initial_payload = len(data)
if len(data) > 125 and len(data) <= 65536:
initial_payload = 126
elif len(data) > 65536:
initial_payload = 127
packet += chr(
(int(mask) << 7) | \
initial_payload
)
if initial_payload == 126:
packet += struct.pack('!H', len(data))
elif initial_payload == 127:
packet += struct.pack('!Q', len(data))
payload_mask = ''
if mask:
payload_mask = ''.join([chr(random.choice(range(256))) for _ in range(4)])
packet += payload_mask
data = self._mask(data, payload_mask)
packet += data
return packet
def _send(self, opcode, data, mask=True):
if (opcode!=0x08 and self._closing) or self.closed:
raise RuntimeError('WebSocket is closed.')
sent = 0
while sent < len(data):
to_send = min(len(data), self.max_packet_size)
is_final = (to_send) >= len(data)
packet = self._create_packet(opcode, data, mask, is_final)
try:
self.write_chunk(packet)
except socket.error:
self.closed = True
sent += to_send
def _send_close(self, data):
self._closing = True
self._send(0x08, data)
self._close_sent = True
def _handle_close(self, data):
if not self._close_sent:
self._send_close(data)
self.closed = True
self._sock.close()
self.after_close()
def receive_until_closed(self):
while not self.closed:
self.receive_packet()
def send_text(self, text):
self._send(0x01, text.encode('utf8'))
def send_binary(self, bitstring):
if isinstance(bitstring, unicode):
bitstring = str(bitstring)
self._send(0x02, bitstring)
def send_ping(self, data=''):
self._send(0x09, data)
def send_pong(self, data):
self._send(0x0A, data)
def close(self, reason='', wait=True):
if self._closing or self.closed:
raise RuntimeError('Already closing.')
self._send_close(reason.encode('utf8'))
while wait and not not self.closed:
# Wait until we've receive the close handshake before returning
self.receive_packet()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment