Created
May 16, 2013 08:20
-
-
Save ekho/5590219 to your computer and use it in GitHub Desktop.
ThreadManager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(ticks = 1); | |
/** | |
* ThreadManager | |
*/ | |
class ThreadManager extends CApplicationComponent | |
{ | |
const E_WORKER = 255; | |
protected static $isChildWorker = false; | |
protected $maxWorkers = 1; | |
protected $workerCallback; | |
protected $workersParams = array(); | |
protected $workers = array(); | |
protected $signalQueue = array(); | |
protected $parentPID; | |
protected $results; | |
/** | |
* | |
*/ | |
public function init() | |
{ | |
Yii::log('maxWorkers = ' . $this->maxWorkers, CLogger::LEVEL_INFO); | |
$this->parentPID = trim(`hostname`) . '/' . getmypid(); | |
if ($this->maxWorkers > 1) { | |
pcntl_signal(SIGCHLD, array($this, "childSignalHandler")); | |
} | |
} | |
/** | |
* @return int | |
*/ | |
public function getMaxWorkers() | |
{ | |
return $this->maxWorkers; | |
} | |
/** | |
* @param $maxWorkers | |
* @return $this | |
*/ | |
public function setMaxWorkers($maxWorkers) | |
{ | |
$this->maxWorkers = extension_loaded('pcntl') ? max($maxWorkers, 1) : 1; | |
return $this; | |
} | |
/** | |
* @return mixed | |
*/ | |
public function getWorkerCallback() | |
{ | |
return $this->workerCallback; | |
} | |
/** | |
* @param $workerCallback | |
* @return $this | |
*/ | |
public function setWorkerCallback($workerCallback) | |
{ | |
$this->workerCallback = $workerCallback; | |
return $this; | |
} | |
/** | |
* @return array | |
*/ | |
public function getWorkersParams() | |
{ | |
return $this->workersParams; | |
} | |
/** | |
* @param $workersParams | |
* @return $this | |
*/ | |
public function setWorkersParams($workersParams) | |
{ | |
$this->workersParams = $workersParams; | |
return $this; | |
} | |
/** | |
* @return mixed | |
*/ | |
public function getParentPID() | |
{ | |
return $this->parentPID; | |
} | |
/** | |
* @return mixed | |
*/ | |
public function getResults() | |
{ | |
return $this->results; | |
} | |
/** | |
* Run the Daemon | |
*/ | |
public function run() | |
{ | |
$this->results = null; | |
Yii::log("Dispatcher $this->parentPID started"); | |
foreach ($this->workersParams as $key => $params) { | |
while (count($this->workers) >= $this->maxWorkers) { | |
Yii::log("Reached the maximum number of workers, waiting...", CLogger::LEVEL_TRACE); | |
usleep(1000); | |
} | |
$this->launchWorker($this->workerCallback, $key, $params); | |
} | |
//Wait for child processes to finish before exiting here | |
while (count($this->workers)) { | |
if (version_compare(PHP_VERSION, '5.3.0') >= 0) { | |
pcntl_signal_dispatch(); | |
} | |
usleep(1000); | |
} | |
Yii::log("Dispatcher $this->parentPID finished"); | |
return $this; | |
} | |
/** | |
* Launch a job from the job queue | |
*/ | |
protected function launchWorker($callback, $key, $params) | |
{ | |
$pid = $this->maxWorkers > 1 ? pcntl_fork() : false; | |
if ($pid == -1) { | |
//Problem launching the job | |
Yii::log('Could not launch new worker', CLogger::LEVEL_ERROR); | |
return false; | |
} elseif ($pid) { | |
// Parent process | |
// Sometimes you can receive a signal to the childSignalHandler function before this code executes if | |
// the child script executes quickly enough! | |
// | |
$this->workers[$pid] = $key; | |
$this->results[$key] = null; | |
// In the event that a signal for this pid was caught before we get here, | |
// it will be in our signalQueue array | |
// So let's go ahead and process it now as if we'd just received the signal | |
if (isset($this->signalQueue[$pid])) { | |
Yii::log("found $pid in the signal queue, processing it now", CLogger::LEVEL_TRACE); | |
$this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]); | |
unset($this->signalQueue[$pid]); | |
} | |
} else { | |
if ($this->maxWorkers > 1) { | |
self::setIsChildWorker(); | |
} | |
try { | |
$exitCode = call_user_func($callback, $key, $params); | |
} catch (Exception $e) { | |
Yii::log((string)$e, CLogger::LEVEL_ERROR); | |
$exitCode = self::E_WORKER; | |
} | |
if ($this->maxWorkers > 1) { | |
exit($exitCode); | |
} else { | |
$this->results[$key] = $exitCode; | |
} | |
} | |
return true; | |
} | |
/** | |
* @param $signo | |
* @param null $pid | |
* @param null $status | |
* @return bool | |
*/ | |
public function childSignalHandler($signo, $pid = null, $status = null) | |
{ | |
//If no pid is provided, that means we're getting the signal from the system. Let's figure out | |
//which child process ended | |
if (!$pid) { | |
$pid = pcntl_waitpid(-1, $status, WNOHANG); | |
} | |
//Make sure we get all of the exited children | |
while ($pid > 0) { | |
if ($pid && isset($this->workers[$pid])) { | |
$exitCode = pcntl_wexitstatus($status); | |
$this->results[$this->workers[$pid]] = $exitCode; | |
if ($exitCode == self::E_WORKER) { | |
Yii::log( | |
"Error ocured in the worker process $pid (key: {$this->workers[$pid]})", | |
CLogger::LEVEL_WARNING | |
); | |
} | |
unset($this->workers[$pid]); | |
} else { | |
if ($pid) { | |
//Oh no, our job has finished before this parent process could even note that it had been launched! | |
//Let's make note of it and handle it when the parent process is ready for it | |
Yii::log("Adding $pid to the signal queue"); | |
$this->signalQueue[$pid] = $status; | |
} | |
} | |
$pid = pcntl_waitpid(-1, $status, WNOHANG); | |
} | |
return true; | |
} | |
/** | |
* | |
*/ | |
public static function setIsChildWorker() | |
{ | |
self::$isChildWorker = true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment