Skip to content

Instantly share code, notes, and snippets.

@nrk
Created April 3, 2011 10:54
Show Gist options
  • Select an option

  • Save nrk/900358 to your computer and use it in GitHub Desktop.

Select an option

Save nrk/900358 to your computer and use it in GitHub Desktop.
Let's preserve the UdpConnection class from early versions of Predis v0.7-dev
// This is the old UdpConnection class used in early versions of Predis v0.7-dev to
// test the udp branch of Redis (which is now gone). I am keeping this class only
// for reference, it is not compatible with the current API design of Predis v0.7-dev
// and support for UDP is no more part of the future plans of Redis anyway.
class UdpConnection extends ConnectionBase {
private $_requestId, $_databaseId, $_authPwd;
private $_replyBuf, $_replyBufL, $_replyBufP;
public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
$this->_requestId = 0;
$this->resetReplyBuffer();
parent::__construct($parameters, $reader);
}
public function connect() {
if ($this->isConnected()) {
throw new ClientException('Connection already estabilished');
}
$uri = sprintf('udp://%s:%d/', $this->_params->host, $this->_params->port);
$this->_socket = @stream_socket_client(
$uri, $errno, $errstr, $this->_params->connection_timeout
);
if (!$this->_socket) {
$this->onCommunicationException(trim($errstr), $errno);
}
}
public function pushInitCommand(Command $command){
$command_id = $command->getCommandId();
if ($command_id === 'SELECT') {
$this->_database = (string) $command->getArgument(0);
}
else if ($command_id === 'AUTH') {
$this->_authPwd = (string) $command->getArgument(0);
}
else {
throw new \ArgumentException(
'Only SELECT and AUTH commands are accepted for UDP connections'
);
}
}
private function createRequestPacket(Command $command) {
$req_flags = 0;
$cmd_id = $command->getCommandId();
$cmd_argv = $command->getArguments();
$cmd_argc = count($cmd_argv) + 1;
$payload = pack('n', strlen($cmd_id)) . $cmd_id;
if (isset($this->_authPwd)) {
$cmd_argc++;
$payload .= pack('n', strlen($this->_authPwd)) . $this->_authPwd;
}
foreach ($cmd_argv as $argument) {
$payload .= pack('n', strlen($argument)) . $argument;
}
$header = pack('NCCnnn', $this->_requestId++, 1, $req_flags,
strlen($payload), $cmd_argc, $this->_databaseId);
return $header . $payload;
}
private function resetReplyBuffer() {
$this->_replyBuf = '';
$this->_replyBufL = 0;
$this->_replyBufP = 0;
}
private function ensureReplyBuffer() {
$bufPos = &$this->_replyBufP;
$bufLen = &$this->_replyBufL;
if ($bufLen === $bufPos) {
$replyPacket = stream_socket_recvfrom($this->getSocket(), 65535);
if ($replyPacket === false || $replyPacket === '') {
$this->onCommunicationException(
'Error while reading bytes from the server'
);
}
$header = unpack('Nreqid/Copcode/Cflags/npaylen', $replyPacket);
if (($header['flags'] & 0x08) === 0x08) {
// TODO: change to a more specific exception
throw new \Predis\ClientException(
"Flag TRUNC set in reply to request ID {$header['reqid']}"
);
}
$replyData = substr($replyPacket, 8);
$bufLen = strlen($replyData);
$bufPos = 0;
$this->_replyBuf = &$replyData;
}
}
private function createFallbackConnection() {
$params = $this->_params;
$connectionClass = Connection::getClass($params->fallback);
$fallbackParams = new ConnectionParameters(array_merge(
$params->toArray(), array('scheme' => $params->fallback, 'fallback' => null)
));
return new $connectionClass($fallbackParams, $this->_reader);
}
public function writeCommand(Command $command) {
$this->writeBytes($this->createRequestPacket($command));
}
public function readResponse(Command $command) {
try {
$response = $this->_reader->read($this);
}
catch (ClientException $exception) {
if (!isset($this->_params->fallback)) {
throw $exception;
}
$connection = $this->createFallbackConnection();
$connection->writeCommand($command);
$response = $this->_reader->read($connection);
}
$skipparse = isset($response->queued) || isset($response->error);
return $skipparse ? $response : $command->parseResponse($response);
}
public function executeCommand(Command $command) {
$this->writeCommand($command);
if ($command->closesConnection()) {
return $this->disconnect();
}
return $this->readResponse($command);
}
public function writeBytes($value) {
$socket = $this->getSocket();
while (($length = strlen($value)) > 0) {
$written = fwrite($socket, $value);
if ($length === $written) {
return true;
}
if ($written === false || $written === 0) {
$this->onCommunicationException('Error while writing bytes to the server');
}
$value = substr($value, $written);
}
return true;
}
public function readBytes($length) {
if ($length == 0) {
throw new \InvalidArgumentException('Length parameter must be greater than 0');
}
$this->ensureReplyBuffer();
$bufBuf = &$this->_replyBuf;
$bufPos = &$this->_replyBufP;
$bufLen = &$this->_replyBufL;
$len = $bufLen > $bufPos + $length ? $length : $bufLen - $bufPos;
$bytes = substr($bufBuf, $bufPos, $len);
$bufPos += $len;
return $bytes;
}
public function readLine() {
$this->ensureReplyBuffer();
$bufPos = &$this->_replyBufP;
$bufBuf = &$this->_replyBuf;
$crlfPos = strpos($bufBuf, Protocol::NEWLINE, $bufPos);
$line = substr($bufBuf, $bufPos, $crlfPos - $bufPos);
$bufPos = $crlfPos + 2;
return $line;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment