Forked from d0dak/Predis_Connection_Using_Plain_Sockets.php
Created
July 28, 2010 20:26
-
-
Save nrk/496149 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/* | |
* The code in this file is a modification of a file in | |
* the predis project, and is thus governed by the same | |
* license. For details, please see: | |
* | |
* http://github.com/nrk/predis/blob/master/LICENSE | |
*/ | |
/* | |
* This file (not really useful on its own) contains an | |
* alternative implementation of the php 5.2.x version | |
* of Predis_Connection, based on the socket_* family | |
* of functions, rather than the nicer streams variant. | |
* | |
* The reason for this implementation being the lack of | |
* control over the underlying socket properties, where | |
* in particular the lack of method to set TCP_NODELAY | |
* is an issue. (Leads to severe performance degradation | |
* in certain cases.) | |
* | |
* The modifications are few, and located mainly in the | |
* methods that manipulate the socket directly, although | |
* there are a few small changes in other methods too. | |
*/ | |
class Predis_Connection implements Predis_IConnection { | |
private $_params, $_socket, $_initCmds, $_reader; | |
public function __construct(Predis_ConnectionParameters $parameters, Predis_ResponseReader $reader = null) { | |
$this->_params = $parameters; | |
$this->_initCmds = array(); | |
$this->_reader = $reader !== null ? $reader : new Predis_ResponseReader(); | |
} | |
public function __destruct() { | |
if (!$this->_params->connection_persistent) { | |
$this->disconnect(); | |
} | |
} | |
public function isConnected() { | |
return is_resource($this->_socket); | |
} | |
public function connect() { | |
if ($this->isConnected()) { | |
throw new Predis_ClientException('Connection already estabilished'); | |
} | |
$this->_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); | |
if (!is_resource($this->_socket)) { | |
$this->emitSocketError(); | |
} | |
// TODO: handle async, persistent, and timeout options | |
// $this->_params->connection_async | |
// $this->_params->connection_persistent | |
// $this->_params->connection_timeout | |
// $this->_params->read_write_timeout | |
$remote_host = $this->_params->host; | |
$remote_port = $this->_params->port; | |
$remote_addr = null; // derived below | |
$remote_addr_long = ip2long($remote_host); | |
if ($remote_addr_long == -1 || $remote_addr_long === false) { | |
$remote_addr = gethostbyname($remote_host); | |
} else { | |
$remote_addr = $remote_host; | |
} | |
if (@socket_connect($this->_socket, $remote_addr, $remote_port) === false) { | |
$this->_socket = null; | |
$this->emitSocketError(); | |
} | |
if (!socket_set_block($this->_socket)) { | |
$this->emitSocketError(); | |
} | |
// Disable the Nagle algorithm | |
if (defined('TCP_NODELAY')) { | |
// The TCP_NODELAY constant was added in 5.2.7 | |
// See http://bugs.php.net/bug.php?id=46360 | |
if (!socket_set_option($this->_socket, SOL_TCP, TCP_NODELAY, 1)) { | |
$this->emitSocketError(); | |
} | |
} else { | |
// This works in 5.2.x (x < 7, tested in 5.2.6) | |
if (!socket_set_option($this->_socket, SOL_TCP, 1, 1)) { | |
$this->emitSocketError(); | |
} | |
} | |
if (!socket_set_option($this->_socket, SOL_SOCKET, SO_REUSEADDR, 1)) { | |
$this->emitSocketError(); | |
} | |
if (count($this->_initCmds) > 0){ | |
$this->sendInitializationCommands(); | |
} | |
} | |
public function disconnect() { | |
if ($this->isConnected()) { | |
// TODO: ponder socket_shutdown() | |
// TODO: ponder linger socket options | |
socket_close($this->_socket); | |
$this->_socket = null; | |
} | |
} | |
public function pushInitCommand(Predis_Command $command){ | |
$this->_initCmds[] = $command; | |
} | |
private function sendInitializationCommands() { | |
foreach ($this->_initCmds as $command) { | |
$this->writeCommand($command); | |
} | |
foreach ($this->_initCmds as $command) { | |
$this->readResponse($command); | |
} | |
} | |
private function onCommunicationException($message, $code = null) { | |
Predis_Shared_Utils::onCommunicationException( | |
new Predis_CommunicationException($this, $message, $code) | |
); | |
} | |
private function emitSocketError() { | |
$errno = socket_last_error(); | |
$errstr = socket_strerror($errno); | |
$this->onCommunicationException(trim($errstr), $errno); | |
} | |
public function writeCommand(Predis_Command $command) { | |
$this->writeBytes($command->invoke()); | |
} | |
public function readResponse(Predis_Command $command) { | |
$response = $this->_reader->read($this); | |
$skipparse = isset($response->queued) || isset($response->error); | |
return $skipparse ? $response : $command->parseResponse($response); | |
} | |
public function executeCommand(Predis_Command $command) { | |
$this->writeCommand($command); | |
if ($command->closesConnection()) { | |
return $this->disconnect(); | |
} | |
return $this->readResponse($command); | |
} | |
public function rawCommand($rawCommandData, $closesConnection = false) { | |
$this->writeBytes($rawCommandData); | |
if ($closesConnection) { | |
$this->disconnect(); | |
return; | |
} | |
return $this->_reader->read($this); | |
} | |
public function writeBytes($value) { | |
$socket = $this->getSocket(); | |
while (($length = strlen($value)) > 0) { | |
$written = socket_write($socket, $value, $length); | |
if ($length === $written) { | |
return true; | |
} | |
if ($written === false) { | |
$this->onCommunicationException('Error while writing bytes to the server'); | |
} | |
$value = substr($value, $written); | |
} | |
return true; | |
} | |
public function readBytes($length) { | |
if ($length == 0) { | |
throw new InvalidArgumentException('Length parameter must be greater than 0'); | |
} | |
$socket = $this->getSocket(); | |
$value = ''; | |
do { | |
$chunk = socket_read($socket, $length, PHP_BINARY_READ); | |
if ($chunk === false) { | |
$this->onCommunicationException('Error while reading bytes from the server'); | |
} else if ($chunk === '') { | |
$this->onCommunicationException('Unexpected empty result while reading bytes from the server'); | |
} | |
$value .= $chunk; | |
} | |
while (($length -= strlen($chunk)) > 0); | |
return $value; | |
} | |
public function readLine() { | |
$socket = $this->getSocket(); | |
$value = ''; | |
do { | |
$chunk_len = 4096; | |
// peek ahead (look for Predis_Protocol::NEWLINE) | |
$chunk = ''; | |
$chunk_res = socket_recv($socket, $chunk, $chunk_len, MSG_PEEK); | |
if ($chunk_res === false) { | |
$this->onCommunicationException('Error while peeking line from the server'); | |
} else if ($chunk === '' || is_null($chunk)) { | |
$this->onCommunicationException('Unexpected empty result while peeking line from the server'); | |
} | |
if (($newline_pos = strpos($chunk, Predis_Protocol::NEWLINE)) !== false) { | |
$chunk_len = $newline_pos + 2; | |
} | |
// actual recv (with possibly adjusted chunk_len) | |
$chunk = ''; | |
$chunk_res = socket_recv($socket, $chunk, $chunk_len, 0); | |
if ($chunk_res === false) { | |
$this->onCommunicationException('Error while reading line from the server'); | |
} else if ($chunk === '' || is_null($chunk)) { | |
$this->onCommunicationException('Unexpected empty result while reading line from the server'); | |
} | |
$value .= $chunk; | |
} | |
while (substr($value, -2) !== Predis_Protocol::NEWLINE); | |
return substr($value, 0, -2); | |
} | |
public function getSocket() { | |
if (!$this->isConnected()) { | |
$this->connect(); | |
} | |
return $this->_socket; | |
} | |
public function getResponseReader() { | |
return $this->_reader; | |
} | |
public function getParameters() { | |
return $this->_params; | |
} | |
public function __toString() { | |
return sprintf('%s:%d', $this->_params->host, $this->_params->port); | |
} | |
} | |
?> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment