Skip to content

Instantly share code, notes, and snippets.

@moechofe
Last active August 29, 2015 14:27
Show Gist options
  • Save moechofe/a58a70a35c2233801af1 to your computer and use it in GitHub Desktop.
Save moechofe/a58a70a35c2233801af1 to your computer and use it in GitHub Desktop.
Redis client/listener which use phpiredis extension and socket
<?php
// Need: https://github.com/nrk/phpiredis
class ErrorRespond extends Exception {}
class ErrorConnection extends Exception {}
class ErrorProtocol extends Exception {}
class Client
{
private $redis = null;
function __construct($dsn)
{
assert('is_string($dsn)');
$part = parse_url($dsn);
$this->redis = phpiredis_connect($part['host'], $part['port']);
}
function __call($cmd, array $args)
{
assert('array_filter($args,\'is_string\')===$args');
array_unshift($args,$cmd);
return phpiredis_command_bs($this->redis, $args);
}
}
class Listener
{
private $reader = null;
private $socket = null;
private $subscriber = null;
private function write($buffer)
{
// fwrite with retry
// See: http://fr2.php.net/manual/fr/function.fwrite.php
$bytes_to_write = strlen($buffer);
$bytes_written = 0;
while($bytes_written < $bytes_to_write)
{
if($bytes_written==0) $rv = fwrite($this->socket, $buffer);
else $rv = fwrite($this->socket, substr($buffer, $bytes_written));
if($rv===false || $rv==0) break;
$bytes_written += $rv;
}
}
private function read()
{
while(PHPIREDIS_READER_STATE_INCOMPLETE===($state=phpiredis_reader_get_state($this->reader)))
{
$buffer = fread($this->socket, 4096);
if($buffer===false && $buffer==='') throw new ErrorConnection;
phpiredis_reader_feed($this->reader, $buffer);
}
if($state===PHPIREDIS_READER_STATE_COMPLETE)
{
$reply = phpiredis_reader_get_reply($this->reader);
return $reply;
}
else throw new ErrorProtocol(phpiredis_reader_get_error($this->reader));
}
function __construct($dsn,Closure $cb,$fork_name)
{
assert('is_string($dsn)');
assert('is_string($fork_name)');
$this->reader = phpiredis_reader_create();
phpiredis_reader_set_status_handler($this->reader, function($status)
{
switch($status)
{
case 'OK': return true;
case 'QUEUED': return true;
default: return $status;
}
});
phpiredis_reader_set_error_handler($this->reader, function($msg)
{
throw new ErrorRespond($msg);
});
$this->socket = @stream_socket_client($dsn, $errno, $errstr, 2, STREAM_CLIENT_CONNECT);
if(!$this->socket) throw new ErrorConnection($errstr, $errno);
$this->subscriber = pcntl_fork();
switch($this->subscriber)
{
case -1: throw new Exception("Could not fork!");
case 0:
if($fork_name) cli_set_process_title($fork_name);
while(true) call_user_func_array($cb,$this->read());
}
}
// __destruct() magic function is not call
// maybe because of the remaining fork that has access to $this
function destroy()
{
posix_kill($this->subscriber,SIGTERM);
fclose($this->socket);
phpiredis_reader_destroy($this->reader);
while(pcntl_waitpid($this->subscriber,$status) != -1) $status = pcntl_wexitstatus($status);
}
function subscribe(array $chns)
{
assert('array_filter($chns,\'is_scalar\')===$chns');
array_unshift($chns,'subscribe');
$this->write(phpiredis_format_command($chns));
}
function unsubscribe(array $chns)
{
assert('array_filter($chns,\'is_scalar\')===$chns');
array_unshift($chns,'unsubscribe');
$this->write(phpiredis_format_command($chns));
}
function publish($chn, $msg)
{
assert('is_string($chn)');
assert('is_string($msg)');
$this->write(phpiredis_format_command(array('publish',$chn,$msg)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment