Skip to content

Instantly share code, notes, and snippets.

@ekho
Created May 16, 2013 08:20
Show Gist options
  • Save ekho/5590219 to your computer and use it in GitHub Desktop.
Save ekho/5590219 to your computer and use it in GitHub Desktop.
ThreadManager
<?php
declare(ticks = 1);
/**
* ThreadManager
*/
class ThreadManager extends CApplicationComponent
{
const E_WORKER = 255;
protected static $isChildWorker = false;
protected $maxWorkers = 1;
protected $workerCallback;
protected $workersParams = array();
protected $workers = array();
protected $signalQueue = array();
protected $parentPID;
protected $results;
/**
*
*/
public function init()
{
Yii::log('maxWorkers = ' . $this->maxWorkers, CLogger::LEVEL_INFO);
$this->parentPID = trim(`hostname`) . '/' . getmypid();
if ($this->maxWorkers > 1) {
pcntl_signal(SIGCHLD, array($this, "childSignalHandler"));
}
}
/**
* @return int
*/
public function getMaxWorkers()
{
return $this->maxWorkers;
}
/**
* @param $maxWorkers
* @return $this
*/
public function setMaxWorkers($maxWorkers)
{
$this->maxWorkers = extension_loaded('pcntl') ? max($maxWorkers, 1) : 1;
return $this;
}
/**
* @return mixed
*/
public function getWorkerCallback()
{
return $this->workerCallback;
}
/**
* @param $workerCallback
* @return $this
*/
public function setWorkerCallback($workerCallback)
{
$this->workerCallback = $workerCallback;
return $this;
}
/**
* @return array
*/
public function getWorkersParams()
{
return $this->workersParams;
}
/**
* @param $workersParams
* @return $this
*/
public function setWorkersParams($workersParams)
{
$this->workersParams = $workersParams;
return $this;
}
/**
* @return mixed
*/
public function getParentPID()
{
return $this->parentPID;
}
/**
* @return mixed
*/
public function getResults()
{
return $this->results;
}
/**
* Run the Daemon
*/
public function run()
{
$this->results = null;
Yii::log("Dispatcher $this->parentPID started");
foreach ($this->workersParams as $key => $params) {
while (count($this->workers) >= $this->maxWorkers) {
Yii::log("Reached the maximum number of workers, waiting...", CLogger::LEVEL_TRACE);
usleep(1000);
}
$this->launchWorker($this->workerCallback, $key, $params);
}
//Wait for child processes to finish before exiting here
while (count($this->workers)) {
if (version_compare(PHP_VERSION, '5.3.0') >= 0) {
pcntl_signal_dispatch();
}
usleep(1000);
}
Yii::log("Dispatcher $this->parentPID finished");
return $this;
}
/**
* Launch a job from the job queue
*/
protected function launchWorker($callback, $key, $params)
{
$pid = $this->maxWorkers > 1 ? pcntl_fork() : false;
if ($pid == -1) {
//Problem launching the job
Yii::log('Could not launch new worker', CLogger::LEVEL_ERROR);
return false;
} elseif ($pid) {
// Parent process
// Sometimes you can receive a signal to the childSignalHandler function before this code executes if
// the child script executes quickly enough!
//
$this->workers[$pid] = $key;
$this->results[$key] = null;
// In the event that a signal for this pid was caught before we get here,
// it will be in our signalQueue array
// So let's go ahead and process it now as if we'd just received the signal
if (isset($this->signalQueue[$pid])) {
Yii::log("found $pid in the signal queue, processing it now", CLogger::LEVEL_TRACE);
$this->childSignalHandler(SIGCHLD, $pid, $this->signalQueue[$pid]);
unset($this->signalQueue[$pid]);
}
} else {
if ($this->maxWorkers > 1) {
self::setIsChildWorker();
}
try {
$exitCode = call_user_func($callback, $key, $params);
} catch (Exception $e) {
Yii::log((string)$e, CLogger::LEVEL_ERROR);
$exitCode = self::E_WORKER;
}
if ($this->maxWorkers > 1) {
exit($exitCode);
} else {
$this->results[$key] = $exitCode;
}
}
return true;
}
/**
* @param $signo
* @param null $pid
* @param null $status
* @return bool
*/
public function childSignalHandler($signo, $pid = null, $status = null)
{
//If no pid is provided, that means we're getting the signal from the system. Let's figure out
//which child process ended
if (!$pid) {
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
//Make sure we get all of the exited children
while ($pid > 0) {
if ($pid && isset($this->workers[$pid])) {
$exitCode = pcntl_wexitstatus($status);
$this->results[$this->workers[$pid]] = $exitCode;
if ($exitCode == self::E_WORKER) {
Yii::log(
"Error ocured in the worker process $pid (key: {$this->workers[$pid]})",
CLogger::LEVEL_WARNING
);
}
unset($this->workers[$pid]);
} else {
if ($pid) {
//Oh no, our job has finished before this parent process could even note that it had been launched!
//Let's make note of it and handle it when the parent process is ready for it
Yii::log("Adding $pid to the signal queue");
$this->signalQueue[$pid] = $status;
}
}
$pid = pcntl_waitpid(-1, $status, WNOHANG);
}
return true;
}
/**
*
*/
public static function setIsChildWorker()
{
self::$isChildWorker = true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment