Created
May 7, 2011 23:46
-
-
Save shupp/960974 to your computer and use it in GitHub Desktop.
Kestrel Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Enterprise queue consumer interface, called by bin/consumer_cli.php | |
* | |
* @author Bill Shupp <[email protected]> | |
* @copyright 2010-2011 Empower Campaigns | |
*/ | |
class EC_Consumer | |
{ | |
/** | |
* Instance of {@link Zend_Console_Getopt} | |
* | |
* @var Zend_Console_Getopt | |
*/ | |
protected $_opt = null; | |
/** | |
* Which APPLICATION_ENV to run under (see -e) | |
* | |
* @var string | |
*/ | |
protected $_environment = null; | |
/** | |
* The kestrel server IP | |
* | |
* @var string | |
*/ | |
protected $_host = null; | |
/** | |
* The kestrel server port | |
* | |
* @var int | |
*/ | |
protected $_port = null; | |
/** | |
* The kestrel queue name to connect to | |
* | |
* @var string | |
*/ | |
protected $_queue = null; | |
/** | |
* Whether we should show debug output | |
* | |
* @var bool | |
*/ | |
protected $_debug = false; | |
/** | |
* Maximum # of jobs for this process to perform (for memory fail safe) | |
* | |
* @var int | |
*/ | |
protected $_maxJobs = null; | |
/** | |
* Current job count | |
* | |
* @var int | |
*/ | |
protected $_jobCount = 0; | |
/** | |
* Parses arguments from the command line and does error handling | |
* | |
* @param array $argv The $argv from bin/ecli.php | |
* | |
* @throw Zend_Console_Getopt_Exception on failure | |
* @return void | |
*/ | |
public function __construct(array $argv) | |
{ | |
try { | |
$opt = new Zend_Console_Getopt( | |
array( | |
'environment|e=s' => 'environment name (e.g. development)' | |
. ', required', | |
'server|s=s' => 'kestrel server, format of host:port' | |
. ', required', | |
'queue|q=s' => 'queue name (e.g. crawler_campaign)' | |
. ', required', | |
'max-jobs|m=s' => 'max jobs to run before exiting' | |
. ', optional', | |
'debug|d' => 'show debug output' | |
. ', optional', | |
) | |
); | |
$opt->setArguments($argv); | |
$opt->parse(); | |
// Set environment | |
if ($opt->e === null) { | |
throw new Zend_Console_Getopt_Exception( | |
'Error: missing environment' | |
); | |
} | |
$this->_environment = $opt->e; | |
// @codeCoverageIgnoreStart | |
if (!defined('APPLICATION_ENV')) { | |
define('APPLICATION_ENV', $this->_environment); | |
} | |
// @codeCoverageIgnoreEnd | |
// Set server | |
if ($opt->s === null) { | |
throw new Zend_Console_Getopt_Exception( | |
'Error: missing server' | |
); | |
} | |
$parts = explode(':', $opt->s); | |
if (count($parts) !== 2) { | |
throw new Zend_Console_Getopt_Exception( | |
'Error: invalid server: ' . $opt->s | |
); | |
} | |
$this->_host = $parts[0]; | |
$this->_port = $parts[1]; | |
// Set queue | |
if ($opt->q === null) { | |
throw new Zend_Console_Getopt_Exception( | |
'Error: missing queue' | |
); | |
} | |
$this->_queue = $opt->q; | |
// Set max-jobs | |
if ($opt->m !== null) { | |
$this->_maxJobs = $opt->m; | |
} | |
// Set debug | |
if ($opt->d !== null) { | |
$this->_debug = true; | |
} | |
} catch (Zend_Console_Getopt_Exception $e) { | |
echo "\n" . $e->getMessage() . "\n\n"; | |
echo $opt->getUsageMessage(); | |
// @codeCoverageIgnoreStart | |
if (!defined('APPLICATION_ENV') || APPLICATION_ENV !== 'testing') { | |
exit(1); | |
} | |
// @codeCoverageIgnoreEnd | |
} | |
$this->_opt = $opt; | |
} | |
/** | |
* Polls the queue server for jobs and runs them as they come in | |
* | |
* @return void | |
*/ | |
public function run() | |
{ | |
$client = $this->_getKestrelClient(); | |
$queue = 'enterprise_' . $this->_queue; | |
while ($this->_keepRunning()) { | |
// Pull job from queue | |
$job = $client->reliableRead($queue, 500); | |
if ($job === false) { | |
$this->_debug('Nothing on queue ' . $queue); | |
continue; | |
} | |
if (!isset($job['instance'])) { | |
echo 'Instance not set in queue job: ' . print_r($job, true); | |
continue; | |
} | |
$instance = $job['instance']; | |
if (!isset($job['jobName'])) { | |
echo 'Job name not set in queue job: ' . print_r($job, true); | |
continue; | |
} | |
$jobName = $job['jobName']; | |
$data = null; | |
if (isset($job['data'])) { | |
$data = $job['data']; | |
} | |
// Run the job | |
$returnCode = $this->runJob($instance, $jobName, $data); | |
if ($returnCode !== 0) { | |
$client->abortReliableRead($queue); | |
continue; | |
} | |
} | |
$client->closeReliableRead($queue); | |
} | |
/** | |
* Runs the job via bin/ecli.php | |
* | |
* @param string $instance The instance name to run the job under | |
* @param string $jobName The job name | |
* @param string $data Optional extra data | |
* | |
* @return int | |
*/ | |
public function runJob($instance, $jobName, $data) | |
{ | |
$cmd = BASE_PATH . '/bin/ecli.php ' | |
. '-e ' . $this->_environment | |
. ' -i ' . $instance | |
. ' -j ' . $jobName; | |
if ($data) { | |
$cmd .= " '" . base64_encode(json_encode($data)) . "'"; | |
} | |
$returnCode = $this->_passthru($cmd); | |
$this->_jobCount++; | |
$this->_debug('Job count: ' . $this->_jobCount); | |
return $returnCode; | |
} | |
/** | |
* Check to see if the job limit has been reached | |
* | |
* @return bool | |
*/ | |
protected function _keepRunning() | |
{ | |
return ($this->_maxJobs === null) ? true | |
: ($this->_jobCount < $this->_maxJobs); | |
} | |
/** | |
* Show debug messages | |
* | |
* @param mixed $message | |
* | |
* @return void | |
*/ | |
protected function _debug($message) | |
{ | |
if (!$this->_debug) { | |
return; | |
} | |
echo $message . "\n"; | |
} | |
// @codeCoverageIgnoreStart | |
/** | |
* Calls the passthru() function and returns the exit code. Abstracted | |
* for testing. | |
* | |
* @param string $cmd The command to execute | |
* | |
* @return int | |
*/ | |
protected function _passthru($cmd) | |
{ | |
passthru($cmd, $returnCode); | |
return $returnCode; | |
} | |
/** | |
* Gets a single instance of EC_KestrelClient. Abstracted for testing. | |
* | |
* @return void | |
*/ | |
protected function _getKestrelClient() | |
{ | |
if (APPLICATION_ENV === 'testing') { | |
throw new Exception(__METHOD__ . ' was not mocked when testing'); | |
} | |
return new EC_KestrelClient($this->_host, $this->_port); | |
} | |
// @codeCoverageIgnoreEnd | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment