Created
February 25, 2017 16:12
-
-
Save SuoXC/90ec457d8b4e0810d70a740453b1677d to your computer and use it in GitHub Desktop.
use anonymous function and an array of args to create a series of sub process to do the job,no need to care about process management api.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* @example | |
* | |
* $a = new ProcessExecutor(); | |
* $b = function ($arg){ sleep(1);echo "hello $arg\n"; }; | |
* $a->setRunnables($b,['aaa','bbb']); | |
* $a->setCheckStatus(function($arg){return $arg === 'aaa';}); | |
* $a->execute(); | |
* var_dump($a->getRunStatusArr()); | |
* var_dump($a->getRunTimes()); | |
*/ | |
class ProcessExecutor { | |
const FORK_INTERVAL = 1;//s,两次fork的间隔 | |
const STATUS_SUCCESS = 1; | |
const STATUS_FAIL = 0; | |
private $maxTimes = 3; | |
private $runnables = []; | |
private $args = []; | |
private $processes = []; | |
private $runTimes = []; | |
private $runStatusArr = []; | |
private $retryInterval = 0; | |
private $checkStatus; | |
private $onSuccess; | |
private $onFail; | |
/** | |
* drupal中需要: | |
* \Database::closeConnection(); // 防止子进程共享数据库连接导致fatalError | |
* \Redis_Client::reset(); // 防止子进程共享父进程的redis连接导致protocol error | |
*/ | |
public function prepareFork() { } | |
private function forkRun($runnable) { | |
$this->prepareFork(); | |
$hash = spl_object_hash($runnable); | |
$pid = pcntl_fork(); | |
if ($pid == -1) { | |
die('could not fork'); | |
} | |
elseif (!$pid) { // child | |
call_user_func($runnable, $this->args[$hash]); | |
exit(); | |
} | |
else { // parent | |
$this->processes[$pid] = $hash; | |
$this->runTimes[$hash]++; | |
sleep(self::FORK_INTERVAL); // 两次fork最好有间隔 | |
} | |
return $pid; | |
} | |
public function execute() { | |
// 初始化所有需要跑的进程 | |
foreach ( $this->runnables as $runnable ) { | |
$pid = $this->forkRun($runnable); | |
} | |
$success = $fail = 0; | |
// parent | |
while ( $success + $fail < count($this->runnables) ) { // 判断为成功或判断为失败的任务达到总任务数 | |
$pid = pcntl_wait($status, WUNTRACED); | |
$hash = $this->processes[$pid]; | |
$runnable = $this->runnables[$hash]; | |
$arg = $this->args[$hash]; | |
$result = empty($this->checkStatus) ? true : call_user_func($this->checkStatus, $arg); | |
if ($result === true) { | |
$this->runStatusArr[$hash] = self::STATUS_SUCCESS; | |
$success++; | |
if(!empty($this->onSuccess)){ | |
call_user_func($this->onSuccess,$arg); | |
} | |
} | |
elseif ($this->runTimes[spl_object_hash($runnable)] >= $this->maxTimes && $result === false) { | |
$fail++; | |
$this->runStatusArr[$hash] = self::STATUS_FAIL; | |
} | |
else { | |
if(!empty($this->onFail)){ | |
call_user_func($this->onFail,$arg); | |
} | |
sleep($this->retryInterval); | |
$new_pid = $this->forkRun($runnable); | |
} | |
} | |
} | |
public function getMaxTimes() { | |
return $this->maxTimes; | |
} | |
public function getRetryInterval() { | |
return $this->retryInterval; | |
} | |
public function setMaxTimes($maxTimes) { | |
$this->maxTimes = $maxTimes; | |
} | |
public function setRunnables($runnable, $args) { | |
if (!is_callable($runnable)) { | |
throw new Exception('not callable'); | |
} | |
foreach ( $args as $key => $arg ) { | |
$runner = new RunnableWrapper($runnable); | |
$hash = spl_object_hash($runner); | |
$this->runnables[$hash] = $runner; | |
$this->runTimes[$hash] = 0; | |
$this->args[$hash] = $arg; | |
} | |
} | |
public function setOnFail($onFail) { | |
$this->onFail = $onFail; | |
} | |
public function setOnSuccess($onSuccess) { | |
$this->onSuccess = $onSuccess; | |
} | |
public function setRetryInterval($retryInterval) { | |
$this->retryInterval = $retryInterval; | |
} | |
public function setCheckStatus($checkStatus) { | |
if(!is_callable($checkStatus)){ | |
throw new Exception("check status not callable"); | |
} | |
$this->checkStatus = $checkStatus; | |
} | |
public function getRunStatusArr() { | |
return $this->runStatusArr; | |
} | |
public function getRunTimes(){ | |
return $this->runTimes; | |
} | |
} | |
class RunnableWrapper { | |
private $callable; | |
public function __construct($callable) { | |
$this->callable = $callable; | |
} | |
public function __invoke($args) { | |
call_user_func($this->callable, $args); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment