Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nrk/496149 to your computer and use it in GitHub Desktop.
Save nrk/496149 to your computer and use it in GitHub Desktop.
<?php
/*
* The code in this file is a modification of a file in
* the predis project, and is thus governed by the same
* license. For details, please see:
*
* http://github.com/nrk/predis/blob/master/LICENSE
*/
/*
* This file (not really useful on its own) contains an
* alternative implementation of the php 5.2.x version
* of Predis_Connection, based on the socket_* family
* of functions, rather than the nicer streams variant.
*
* The reason for this implementation being the lack of
* control over the underlying socket properties, where
* in particular the lack of method to set TCP_NODELAY
* is an issue. (Leads to severe performance degradation
* in certain cases.)
*
* The modifications are few, and located mainly in the
* methods that manipulate the socket directly, although
* there are a few small changes in other methods too.
*/
class Predis_Connection implements Predis_IConnection {
private $_params, $_socket, $_initCmds, $_reader;
public function __construct(Predis_ConnectionParameters $parameters, Predis_ResponseReader $reader = null) {
$this->_params = $parameters;
$this->_initCmds = array();
$this->_reader = $reader !== null ? $reader : new Predis_ResponseReader();
}
public function __destruct() {
if (!$this->_params->connection_persistent) {
$this->disconnect();
}
}
public function isConnected() {
return is_resource($this->_socket);
}
public function connect() {
if ($this->isConnected()) {
throw new Predis_ClientException('Connection already estabilished');
}
$this->_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if (!is_resource($this->_socket)) {
$this->emitSocketError();
}
// TODO: handle async, persistent, and timeout options
// $this->_params->connection_async
// $this->_params->connection_persistent
// $this->_params->connection_timeout
// $this->_params->read_write_timeout
$remote_host = $this->_params->host;
$remote_port = $this->_params->port;
$remote_addr = null; // derived below
$remote_addr_long = ip2long($remote_host);
if ($remote_addr_long == -1 || $remote_addr_long === false) {
$remote_addr = gethostbyname($remote_host);
} else {
$remote_addr = $remote_host;
}
if (@socket_connect($this->_socket, $remote_addr, $remote_port) === false) {
$this->_socket = null;
$this->emitSocketError();
}
if (!socket_set_block($this->_socket)) {
$this->emitSocketError();
}
// Disable the Nagle algorithm
if (defined('TCP_NODELAY')) {
// The TCP_NODELAY constant was added in 5.2.7
// See http://bugs.php.net/bug.php?id=46360
if (!socket_set_option($this->_socket, SOL_TCP, TCP_NODELAY, 1)) {
$this->emitSocketError();
}
} else {
// This works in 5.2.x (x < 7, tested in 5.2.6)
if (!socket_set_option($this->_socket, SOL_TCP, 1, 1)) {
$this->emitSocketError();
}
}
if (!socket_set_option($this->_socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
$this->emitSocketError();
}
if (count($this->_initCmds) > 0){
$this->sendInitializationCommands();
}
}
public function disconnect() {
if ($this->isConnected()) {
// TODO: ponder socket_shutdown()
// TODO: ponder linger socket options
socket_close($this->_socket);
$this->_socket = null;
}
}
public function pushInitCommand(Predis_Command $command){
$this->_initCmds[] = $command;
}
private function sendInitializationCommands() {
foreach ($this->_initCmds as $command) {
$this->writeCommand($command);
}
foreach ($this->_initCmds as $command) {
$this->readResponse($command);
}
}
private function onCommunicationException($message, $code = null) {
Predis_Shared_Utils::onCommunicationException(
new Predis_CommunicationException($this, $message, $code)
);
}
private function emitSocketError() {
$errno = socket_last_error();
$errstr = socket_strerror($errno);
$this->onCommunicationException(trim($errstr), $errno);
}
public function writeCommand(Predis_Command $command) {
$this->writeBytes($command->invoke());
}
public function readResponse(Predis_Command $command) {
$response = $this->_reader->read($this);
$skipparse = isset($response->queued) || isset($response->error);
return $skipparse ? $response : $command->parseResponse($response);
}
public function executeCommand(Predis_Command $command) {
$this->writeCommand($command);
if ($command->closesConnection()) {
return $this->disconnect();
}
return $this->readResponse($command);
}
public function rawCommand($rawCommandData, $closesConnection = false) {
$this->writeBytes($rawCommandData);
if ($closesConnection) {
$this->disconnect();
return;
}
return $this->_reader->read($this);
}
public function writeBytes($value) {
$socket = $this->getSocket();
while (($length = strlen($value)) > 0) {
$written = socket_write($socket, $value, $length);
if ($length === $written) {
return true;
}
if ($written === false) {
$this->onCommunicationException('Error while writing bytes to the server');
}
$value = substr($value, $written);
}
return true;
}
public function readBytes($length) {
if ($length == 0) {
throw new InvalidArgumentException('Length parameter must be greater than 0');
}
$socket = $this->getSocket();
$value = '';
do {
$chunk = socket_read($socket, $length, PHP_BINARY_READ);
if ($chunk === false) {
$this->onCommunicationException('Error while reading bytes from the server');
} else if ($chunk === '') {
$this->onCommunicationException('Unexpected empty result while reading bytes from the server');
}
$value .= $chunk;
}
while (($length -= strlen($chunk)) > 0);
return $value;
}
public function readLine() {
$socket = $this->getSocket();
$value = '';
do {
$chunk_len = 4096;
// peek ahead (look for Predis_Protocol::NEWLINE)
$chunk = '';
$chunk_res = socket_recv($socket, $chunk, $chunk_len, MSG_PEEK);
if ($chunk_res === false) {
$this->onCommunicationException('Error while peeking line from the server');
} else if ($chunk === '' || is_null($chunk)) {
$this->onCommunicationException('Unexpected empty result while peeking line from the server');
}
if (($newline_pos = strpos($chunk, Predis_Protocol::NEWLINE)) !== false) {
$chunk_len = $newline_pos + 2;
}
// actual recv (with possibly adjusted chunk_len)
$chunk = '';
$chunk_res = socket_recv($socket, $chunk, $chunk_len, 0);
if ($chunk_res === false) {
$this->onCommunicationException('Error while reading line from the server');
} else if ($chunk === '' || is_null($chunk)) {
$this->onCommunicationException('Unexpected empty result while reading line from the server');
}
$value .= $chunk;
}
while (substr($value, -2) !== Predis_Protocol::NEWLINE);
return substr($value, 0, -2);
}
public function getSocket() {
if (!$this->isConnected()) {
$this->connect();
}
return $this->_socket;
}
public function getResponseReader() {
return $this->_reader;
}
public function getParameters() {
return $this->_params;
}
public function __toString() {
return sprintf('%s:%d', $this->_params->host, $this->_params->port);
}
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment