Created
May 7, 2011 23:38
-
-
Save shupp/960968 to your computer and use it in GitHub Desktop.
Kestrel Client Decorator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* A thin kestrel client that wraps Memcached (libmemcached extension) | |
* | |
* @author Bill Shupp <[email protected]> | |
* @copyright 2010-2011 Empower Campaigns | |
*/ | |
class EC_KestrelClient | |
{ | |
/** | |
* The Memcached instance | |
* | |
* @var Memcached | |
*/ | |
protected $_memcached = null; | |
/** | |
* The Kestrel server IP | |
* | |
* @var string | |
*/ | |
protected $_host = '127.0.0.1'; | |
/** | |
* The Kestrel server port | |
* | |
* @var string | |
*/ | |
protected $_port = 22133; | |
/** | |
* Optional options, not currently used | |
* | |
* @var array | |
*/ | |
protected $_options = array(); | |
/** | |
* Sets the host, port, and options to be used | |
* | |
* @param string $host The host to use, defaults to 127.0.0.1 | |
* @param int $port The port to use, defaults to 22133 | |
* @param array $options Memcached options, not currently used | |
* | |
* @return void | |
*/ | |
public function __construct( | |
$host = '127.0.0.1', $port = 22133, array $options = array() | |
) | |
{ | |
$this->_host = $host; | |
$this->_port = $port; | |
$this->setOptions($options); | |
} | |
/** | |
* Sets job data on the queue, json_encoding the value to avoid problematic | |
* serialization. | |
* | |
* @param string $queue The queue name | |
* @param mixed $data The data to store | |
* | |
* @return bool | |
*/ | |
public function set($queue, $data) | |
{ | |
// Local json serialization, as kestrel doesn't send serialization flags | |
return $this->getMemcached()->set($queue, json_encode($data)); | |
} | |
/** | |
* Reliably read an item off of the queue. Meant to be run in a loop, and | |
* call closeReliableRead() when done to make sure the final job is not left | |
* on the queue. | |
* | |
* @param mixed $queue The queue name to read from | |
* @param int $timeout The timeout to wait for a job to appear | |
* | |
* @return array|false | |
* @see closeReliableRead() | |
*/ | |
public function reliableRead($queue, $timeout = 1000) | |
{ | |
$queue = $queue . '/close/open/t=' . $timeout; | |
$result = $this->getMemcached()->get($queue); | |
if ($result === false) { | |
return $result; | |
} | |
// Local json serialization, as kestrel doesn't send serialization flags | |
return json_decode($result, true); | |
} | |
/** | |
* Closes any existing open read | |
* | |
* @param string $queue The queue name | |
* | |
* @return false | |
*/ | |
public function closeReliableRead($queue) | |
{ | |
$queue = $queue . '/close'; | |
return $this->getMemcached()->get($queue); | |
} | |
/** | |
* Aborts an existing reliable read | |
* | |
* @param string $queue The queue name | |
* | |
* @return false | |
*/ | |
public function abortReliableRead($queue) | |
{ | |
$queue = $queue . '/abort'; | |
return $this->getMemcached()->get($queue); | |
} | |
/** | |
* Set an option to be used with the Memcached client. Not used. | |
* | |
* @param string $name The option name | |
* @param value $value The option value | |
* | |
* @return void | |
*/ | |
public function setOption($name, $value) | |
{ | |
$this->_options[$name] = $value; | |
} | |
/** | |
* Sets multiple options | |
* | |
* @param array $options Array of key/values to set | |
* | |
* @return void | |
*/ | |
public function setOptions(array $options) | |
{ | |
foreach ($options as $name => $value) { | |
$this->setOption($name, $value); | |
} | |
} | |
/** | |
* Gets a current option's value | |
* | |
* @param string $name The option name | |
* | |
* @return mixed | |
*/ | |
public function getOption($name) | |
{ | |
if (isset($this->_options[$name])) { | |
return $this->_options[$name]; | |
} | |
return null; | |
} | |
/** | |
* Gets all current options | |
* | |
* @return array | |
*/ | |
public function getOptions() | |
{ | |
return $this->_options; | |
} | |
/** | |
* Gets a singleton instance of the Memcached client | |
* | |
* @return Memcached | |
*/ | |
public function getMemcached() | |
{ | |
if ($this->_memcached === null) { | |
$this->_initMemcached(); | |
} | |
return $this->_memcached; | |
} | |
/** | |
* Initialized the Memcached client instance | |
* | |
* @return void | |
*/ | |
protected function _initMemcached() | |
{ | |
$this->_memcached = $this->_getMemcachedInstance(); | |
foreach ($this->_options as $option => $value) { | |
$this->_memcached->setOption($option, $value); | |
} | |
$this->_memcached->addServer($this->_host, $this->_port); | |
$this->_memcached->setOption(Memcached::OPT_COMPRESSION, false); | |
} | |
// @codeCoverageIgnoreStart | |
/** | |
* Returns a new instance of Memcached. Abstracted for testing. | |
* | |
* @return Memcached | |
*/ | |
protected function _getMemcachedInstance() | |
{ | |
return new Memcached(); | |
} | |
// @codeCoverageIgnoreEnd | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment