Skip to content

Instantly share code, notes, and snippets.

@thrashr888
Created May 7, 2012 05:17
Show Gist options
  • Save thrashr888/2626049 to your computer and use it in GitHub Desktop.
Save thrashr888/2626049 to your computer and use it in GitHub Desktop.
<?php
// Usage:
// $master=new WebSocket("localhost",12345);
// $master->callback = function($self, $user, $msg){
// $self->send($user->socket,$msg);
// };
require "CommandsAppBase.class.php";
require "WsRequest.class.php";
require "WsResponse.class.php";
require "WsRouter.class.php";
require "WsUserBase.class.php";
class WebSocket
{
public
$master,
$sockets = array(),
$users = array(),
$debug = true,
$callback = null,
$emitter = null,
$globals = array(),
$state = 0,
$secret = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
$db = null;
public function __construct($emitter, $db){
$this->db = $db;
$this->emitter = $emitter;
error_reporting(E_ALL);
set_time_limit(0);
ob_implicit_flush();
}
public function createServer($callback){
$this->callback = $callback;
return $this;
}
public function listen($address,$port){
$this->master=socket_create(AF_INET, SOCK_STREAM, SOL_TCP) or die("socket_create() failed");
socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("socket_option() failed");
socket_bind($this->master, $address, $port) or die("socket_bind() failed");
socket_listen($this->master, 20) or die("socket_listen() failed");
socket_set_nonblock($this->master);
$this->sockets[] = $this->master;
$this->say("Server Started : ".date('Y-m-d H:i:s'));
$this->say("Listening on : ".$address." port ".$port);
$this->say("Master socket : ".$this->master."\n");
while(true){
$changed = $this->sockets;
$write = NULL;
$except = NULL;
socket_select($changed, $write, $except, NULL);
$this->emitter->emit('tick');
foreach($changed as $socket){
if($socket==$this->master){
$client=socket_accept($this->master);
if($client<0){ $this->log("socket_accept() failed"); continue; }
else{ $this->connect($client); }
}
else{
$bytes = @socket_recv($socket, $buffer, 2048, 0);
$by=$bytes;
$bu='';
while($by>0) {
$by = @socket_recv($socket, $bu, 2048, 0);
//debug($bu);
$bytes += $by;
$buffer .= $bu;
}
//debug($buffer);
//debug($this->unwrap($buffer));
//debug($this->hybi10Decode($buffer));
//debug($bytes);
//debug(socket_strerror(socket_last_error($socket)));
if($bytes==0){
$this->disconnect($socket);
}else{
$user = $this->getuserbysocket($socket);
$user->count++;
if(!$user->handshake){ $this->dohandshake($user,$buffer); }
else{ $this->process($user,$this->unwrap($buffer)); }
}
}
}
}
}
public function process($user,$msg){
/* Extend and modify this method to suit your needs */
/* Basic usage is to echo incoming messages back to client */
//console::log($msg);
$this->emitter->emit('message', $this, $user, $msg);
call_user_func_array($this->callback, array($this, $user, $msg));
//$this->callback($this, $user, $msg);
return true;
}
function connect($socket){
$user = new WsUser($this->db, new console());
$user->socket = $socket;
array_push($this->users,$user);
array_push($this->sockets,$socket);
$this->emitter->emit('connect', $this, $user, $socket);
$this->log($socket." CONNECTED!");
$this->log(date("d/n/Y ")."at ".date("H:i:s T"));
}
function disconnect($socket){
$found=null;
$n=count($this->users);
for($i=0;$i<$n;$i++){
if($this->users[$i]->socket==$socket){ $found=$i; break; }
}
if(!is_null($found)){ array_splice($this->users,$found,1); }
$index=array_search($socket,$this->sockets);
$this->emitter->emit('disconnect', $socket);
socket_close($socket);
$this->log($socket." DISCONNECTED!");
if($index>=0){ array_splice($this->sockets,$index,1); }
}
protected function dohandshake($user,$buffer){
$this->log("\nRequesting handshake...");
$this->log('<<<BUFFER_START>>>');
$this->log($buffer);
$this->log('<<<BUFFER_END>>>');
$headers = http_parse_headers($buffer);
//$this->log($headers);
if (isset($headers['Cookie'])) {
$cookies = http_parse_cookie($headers['Cookie']);
$user->cookies = $cookies->cookies;
//$this->log($user->cookies);
}
$upgrade = "HTTP/1.1 101 WebSocket Protocol Handshake\r\n" .
"Upgrade: WebSocket\r\n" .
"Connection: Upgrade\r\n" .
"Sec-WebSocket-Origin: " . $headers['Origin'] . "\r\n" .
"Sec-WebSocket-Location: ws://" . $headers['Host'] . $headers['Request Url'] . "\r\n" .
"Sec-WebSocket-Accept: ".$this->calcKey($headers['Sec-Websocket-Key']). "\r\n". "\r\n";
$this->log('<<<UPGRADE_START>>>');
$this->log($upgrade);
$this->log('<<<UPGRADE_END>>>');
$this->emitter->emit('handshake', $user->socket, $upgrade);
socket_write($user->socket, $upgrade, strlen($upgrade));
$this->log('<<<UPGRADE_SUCCESS>>>');
$user->handshake=true;
return true;
}
protected function calcKey($key){
return base64_encode(sha1($key.$this->secret, true));
}
function send($user,$res){
//debug(print_r($user,1));
//debug(print_r($res,1));
$msg = $this->wrap($res);
$this->emitter->emit('send', $user->socket, $msg);
socket_write($user->socket, $msg, strlen($msg));
}
function sendAll($res){
foreach($this->users as $user)
{
$this->send($user, $res);
}
}
function sendExcept($res, $a_user){
foreach($this->users as $user)
{
if($user->id == $a_user->id) continue;
$this->send($user, $res);
}
}
function sendGroup($res, $group = 'default'){
foreach($this->users as $user)
{
if($user->group != $group) continue;
$this->send($user, $res);
}
}
function getUserBySocket($socket){
foreach($this->users as &$user)
{
if($user->socket==$socket) return $user;
}
return null;
}
function getUserByUsername($username){
foreach($this->users as &$user)
{
if($user->username==$username) return $user;
}
return null;
}
function getUserByUid($uid){
foreach($this->users as &$user)
{
if($user->uid==$uid) return $user;
}
return null;
}
function getUserById($id){
foreach($this->users as &$user)
{
if($user->id==$id) return $user;
}
return null;
}
function say($msg=""){ echo $msg."\n"; }
function log($msg=""){ if($this->debug){ console::log($msg); } }
function wrap($msg=""){ return $this->frame_send(json_encode($msg)); }
function unwrap($msg=""){
$data = $this->hybi10Decode($msg);
return json_decode($data['payload']);
}
protected function frame_recieve($data){
$bytes = $data;
$data_length = "";
$mask = "";
$coded_data = "" ;
$decoded_data = "";
$data_length = $bytes[1] & 127;
if($data_length === 126){
$mask = substr($bytes, 4, 8);
$coded_data = substr($bytes, 8);
}else if($data_length === 127){
$mask = substr($bytes, 10, 14);
$coded_data = substr($bytes, 14);
}else{
$mask = substr($bytes, 2, 6);
$coded_data = substr($bytes, 6);
}
for($i=0;$i<strlen($coded_data);$i++){
$decoded_data .= $coded_data[$i] ^ $mask[$i%4];
}
return $decoded_data;
}
private function hybi10Decode($data) {
$payloadLength = '';
$mask = '';
$unmaskedPayload = '';
$decodedData = array();
// estimate frame type:
$firstByteBinary = sprintf('%08b', ord($data[0]));
$secondByteBinary = sprintf('%08b', ord($data[1]));
$opcode = bindec(substr($firstByteBinary, 4, 4));
$isMasked = ($secondByteBinary[0] == '1') ? true : false;
$payloadLength = ord($data[1]) & 127;
// close connection if unmasked frame is received:
if($isMasked === false)
{
//$this->close(1002);
//$this->disconnect();
}
switch($opcode)
{
// text frame:
case 1:
$decodedData['type'] = 'text';
break;
// connection close frame:
case 8:
$decodedData['type'] = 'close';
break;
// ping frame:
case 9:
$decodedData['type'] = 'ping';
break;
// pong frame:
case 10:
$decodedData['type'] = 'pong';
break;
default:
// Close connection on unknown opcode:
//$this->close(1003);
//$this->disconnect();
break;
}
if($payloadLength === 126)
{
$mask = substr($data, 4, 4);
$payloadOffset = 8;
$dataLength = bindec(sprintf('%08b', ord($data[2])) . sprintf('%08b', ord($data[3]))) + $payloadOffset;
}
elseif($payloadLength === 127)
{
$mask = substr($data, 10, 4);
$payloadOffset = 14;
$tmp = '';
for($i = 0; $i < 8; $i++)
{
$tmp .= sprintf('%08b', ord($data[$i+2]));
}
$dataLength = bindec($tmp) + $payloadOffset;
unset($tmp);
}
else
{
$mask = substr($data, 2, 4);
$payloadOffset = 6;
$dataLength = $payloadLength + $payloadOffset;
}
if($isMasked === true)
{
for($i = $payloadOffset; $i < $dataLength; $i++)
{
$j = $i - $payloadOffset;
$unmaskedPayload .= $data[$i] ^ $mask[$j % 4];
}
$decodedData['payload'] = $unmaskedPayload;
}
else
{
$payloadOffset = $payloadOffset - 4;
$decodedData['payload'] = substr($data, $payloadOffset);
}
return $decodedData;
}
protected function frame_send($data)
{
$frame = Array();
$encoded = "";
$frame[0] = 0x81;
$data_length = strlen($data);
if($data_length <= 125){
$frame[1] = $data_length;
}else{
$frame[1] = 126;
$frame[2] = $data_length >> 8;
$frame[3] = $data_length & 0xFF;
}
for($i=0;$i<sizeof($frame);$i++){
$encoded .= chr($frame[$i]);
}
$encoded .= $data;
return $encoded;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment