Created
April 20, 2023 07:58
-
-
Save forthxu/446260e3a4ae5fd446b59c847ab27b7a to your computer and use it in GitHub Desktop.
swoole-1.8.5 多进程任务处理
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* 任务处理服务 | |
$taskServer = new \TaskServer( | |
100,//工作进程数量 | |
function($taskServer){//onMaster: 主进程任务 | |
//测试发任务到工作进程 | |
swoole_timer_tick(100, function () use ($taskServer) { | |
$taskServer->task("hello"); | |
}); | |
}, | |
function($data){//onTask: 任务进程对数据做处理并返回 | |
return $data.' world'; | |
}, | |
function($data, $pid){//onFinish: 获取任务进程处理完的数据 | |
$this->log($pid.': '.$data); | |
} | |
); | |
$taskServer->start(); | |
*/ | |
class TaskServer extends \Swoole\SwooleAbstract { | |
protected $_workerNum; | |
protected $_onMaster; | |
protected $_onTask; | |
protected $_onFinish; | |
protected $_pid; | |
protected $_timerIds; | |
protected $_workers; | |
public function __construct($workNum=3, $onMaster=null, $onTask=null, $onFinish=null){ | |
$this->_workerNum = $workNum; | |
$this->_onMaster = $onMaster; | |
$this->_onTask = $onTask; | |
$this->_onFinish = $onFinish; | |
} | |
public function start() { | |
try { | |
//守护进程 | |
//swoole_process::daemon(false, true); | |
//主进程 | |
$pid = getmypid(); | |
$this->_pid = $pid; | |
$this->log('master pid:' . $pid . ' start.'); | |
swoole_set_process_name("taskServer: master {$pid}"); | |
//创建子进程 | |
for ($i = 0; $i < $this->_workerNum; $i++) { | |
$this->createProcess(); | |
} | |
//定时 | |
$this->_timerIds[] = swoole_timer_tick(1000, function () { | |
//检查工作进程数量 | |
if (count($this->_workers) < $this->_workerNum) { | |
$this->createProcess(); | |
} | |
//回收工作进程 | |
$ret = swoole_process::wait(false); | |
if ($ret) { | |
$pid = $ret['pid']; | |
$this->destryProcess($pid); | |
$this->log("worker down wait worker process exist:{$ret['signal']} pid:" . $pid); | |
} | |
}); | |
//主进程关闭 | |
swoole_process::signal(SIGTERM, function ($signo) { | |
$pid = getmypid(); | |
$this->log("master process exist rev:{$signo} pid:" . $pid); | |
//清理所有定时任务 | |
foreach ($this->_timerIds as $timerId) { | |
swoole_timer_clear($timerId); | |
} | |
//清理工作进程 | |
foreach ($this->_workers as $pid => $process) { | |
$process->kill($pid, SIGTERM); | |
} | |
//等待工作进程退出 | |
while (true) { | |
//等待工作进程结束 | |
$count = count($this->_workers); | |
if ($count > 0) { | |
$this->log("wait worker num: " . $count); | |
$ret = swoole_process::wait(false); | |
if ($ret) { | |
$pid = $ret['pid']; | |
$this->destryProcess($pid); | |
$this->log("master down wait worker process exist:{$ret['signal']} pid:" . $pid); | |
} | |
} else { | |
break; | |
} | |
} | |
exit(); | |
}); | |
//主进程任务 | |
if(is_callable($this->_onMaster)){ | |
$fun = $this->_onMaster; | |
$fun($this); | |
} | |
} catch (\Exception $e) { | |
die('server error: ' . $e->getMessage()); | |
} | |
} | |
public function task($data) { | |
$processKey = array_rand($this->_workers); | |
$process = $this->_workers[$processKey]; | |
//$result = $process->write($data . " {$process->pid}"); | |
$result = $process->write($data); | |
if (!$result) { | |
$this->log("write fail pid: " . $process->pid); | |
} | |
} | |
protected function destryProcess($pid) { | |
$process = $this->_workers[$pid]; | |
swoole_event_del($process->pipe); //清理监听 | |
$process->close(); | |
unset($this->_workers[$pid]); //移除进程管理 | |
} | |
protected function createProcess() { | |
$process = new swoole_process(function (swoole_process $process) { | |
$this->log('worker pid:' . $process->pid . ' start.'); | |
$process->name("taskServer: worker {$process->pid}"); | |
//子进程内收到主进程消息 | |
swoole_event_add($process->pipe, function ($pipe) use ($process) { | |
$this->onTask($pipe, $process); | |
}); | |
//主进程退出,工作进程跟着退出 | |
swoole_timer_tick(1000, function () { | |
if (!swoole_process::kill($this->_pid, 0)) { | |
// 主进程已退出 | |
$pid = getmypid(); | |
$this->log("master process exist " . $this->_pid . " worker process exist " . $pid); | |
exit(); | |
} | |
}); | |
swoole_process::signal(SIGSEGV, function ($signo) use ($process) { | |
//子进程 | |
$pid = getmypid(); | |
//子进程退出 | |
$this->log("worker process exist rev:{$signo} pid:" . $pid); | |
}); | |
//子进程收到信号 | |
swoole_process::signal(SIGTERM, function ($signo) use ($process) { | |
//子进程 | |
$pid = getmypid(); | |
//移除监听 | |
swoole_event_del($process->pipe); | |
$process->close(); | |
//子进程退出 | |
$this->log("worker process exist rev:{$signo} pid:" . $pid); | |
$process->exit(); | |
exit(); | |
}); | |
}); | |
$pid = $process->start(); | |
if ($pid < 1) { | |
$this->log("worker process create fail"); | |
return 0; | |
} | |
//收录子进程 | |
$this->_workers[$pid] = $process; | |
//主进程收到子进程消息 | |
swoole_event_add($process->pipe, function ($pipe) use ($process) { | |
$this->onFinish($pipe, $process); | |
}); | |
return $pid; | |
} | |
//工作进程收到数据 | |
protected function onTask($pipe, $process) { | |
if (!$process instanceof \swoole_process) { | |
$this->log("fail 2"); | |
} | |
$data = $process->read(); | |
$result=''; | |
if(is_callable($this->_onTask)){ | |
$fun = $this->_onTask; | |
$result = $fun($data); | |
}else{ | |
//$this->log("RECV: " . $data); | |
//$result = "hello master {$process->pid}"; | |
} | |
if(strlen($result)>0){ | |
$process->write($result); | |
} | |
} | |
//工作进程处理完业务后返回数据,函数根据pipe确定具体工作进程后读取数据 | |
protected function onFinish($pipe, $process) { | |
if (!$process instanceof \swoole_process) { | |
$this->log("fail 1"); | |
} | |
$data = $process->read(); | |
if(is_callable($this->_onFinish)){ | |
$fun = $this->_onFinish; | |
$fun($data, $process->pid); | |
}else{ | |
//$this->log("RECV: " . $data); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment