Last active
August 9, 2023 08:40
-
-
Save etheriqa/4707859 to your computer and use it in GitHub Desktop.
Parallel processing on PHP using stream_select()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
require_once('WorkerManager.php'); | |
require_once('Worker.php'); | |
class SleepThenEcho implements Worker | |
{ | |
private $time; | |
public function __construct($time) | |
{ | |
$this->time = intval($time); | |
} | |
public function getCommand() | |
{ | |
return 'sleep ' . $this->time . ' && echo ' . $this->time; | |
} | |
public function done($stdout, $stderr) | |
{ | |
echo 'done '; | |
echo str_replace(array("\r\n", "\n", "\r"), ' ', var_export(array( | |
'command' => $this->getCommand(), | |
'stdout' => $stdout, | |
'stderr' => $stderr, | |
), true)), PHP_EOL; | |
} | |
public function fail($stdout, $stderr, $status) | |
{ | |
echo 'fail '; | |
echo str_replace(array("\r\n", "\n", "\r"), ' ', var_export(array( | |
'command' => $this->getCommand(), | |
'stdout' => $stdout, | |
'stderr' => $stderr, | |
'status' => $status, | |
), true)), PHP_EOL; | |
} | |
} | |
$manager = new WorkerManager(); | |
for ($i = -1; $i < 10; $i++) { | |
$manager->attach(new SleepThenEcho($i)); | |
} | |
while (0 < count($manager)) { | |
$manager->listen(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
% php example.php | |
fail array ( 'command' => 'sleep -1 && echo -1', 'stdout' => '', 'stderr' => 'sleep: invalid option -- \'1\' Try \'sleep --help\' for more information. ', 'status' => 1, ) | |
done array ( 'command' => 'sleep 0 && echo 0', 'stdout' => '0 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 1 && echo 1', 'stdout' => '1 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 2 && echo 2', 'stdout' => '2 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 3 && echo 3', 'stdout' => '3 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 4 && echo 4', 'stdout' => '4 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 5 && echo 5', 'stdout' => '5 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 6 && echo 6', 'stdout' => '6 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 7 && echo 7', 'stdout' => '7 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 8 && echo 8', 'stdout' => '8 ', 'stderr' => '', ) | |
done array ( 'command' => 'sleep 9 && echo 9', 'stdout' => '9 ', 'stderr' => '', ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
interface Worker | |
{ | |
public function getCommand(); | |
public function done($stdout, $stderr); | |
public function fail($stdout, $stderr, $status); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class WorkerManager implements Countable | |
{ | |
const STDIN = 0; | |
const STDOUT = 1; | |
const STDERR = 2; | |
const NON_BLOCKING = 0; | |
const BLOCKING = 1; | |
private static $DESCRIPTORSPEC = array( | |
self::STDIN => array('pipe', 'r'), | |
self::STDOUT => array('pipe', 'w'), | |
self::STDERR => array('pipe', 'w'), | |
); | |
private $workers = array(); | |
private $processes = array(); | |
private $stdins = array(); | |
private $stdouts = array(); | |
private $stderrs = array(); | |
public function attach(Worker $worker) | |
{ | |
$process = proc_open($worker->getCommand(), self::$DESCRIPTORSPEC, $pipes); | |
if (false === is_resource($process)) { | |
throw new \RuntimeException(); | |
} | |
stream_set_blocking($pipes[self::STDOUT], self::NON_BLOCKING); | |
$this->workers[] = $worker; | |
$this->processes[] = $process; | |
$this->stdins[] = $pipes[self::STDIN]; | |
$this->stdouts[] = $pipes[self::STDOUT]; | |
$this->stderrs[] = $pipes[self::STDERR]; | |
} | |
public function listen($timeout = 200000) | |
{ | |
$read = array(); | |
foreach ($this->workers as $i => $_) { | |
$read[] = $this->stdouts[$i]; | |
$read[] = $this->stderrs[$i]; | |
} | |
$changed_num = stream_select($read, $write = null, $expect = null, 0, $timeout); | |
if (false === $changed_num) { | |
throw new \RuntimeException(); | |
} | |
if (0 === $changed_num) { | |
return; | |
} | |
foreach ($read as $stream) { | |
$i = array_search($stream, $this->stdouts, true); | |
if (false === $i) { | |
$i = array_search($stream, $this->stderrs, true); | |
if (false === $i) { | |
continue; | |
} | |
} | |
$worker = $this->workers[$i]; | |
$stdout = stream_get_contents($this->stdouts[$i]); | |
$stderr = stream_get_contents($this->stderrs[$i]); | |
$status = $this->detach($worker); | |
if (0 === $status) { | |
$worker->done($stdout, $stderr); | |
} else if (0 < $status) { | |
$worker->fail($stdout, $stderr, $status); | |
} else { | |
throw new \RuntimeException(); | |
} | |
} | |
} | |
public function detach(Worker $worker) | |
{ | |
$i = array_search($worker, $this->workers, true); | |
if (false === $i) { | |
throw new \RuntimeException(); | |
} | |
fclose($this->stdins[$i]); | |
fclose($this->stdouts[$i]); | |
fclose($this->stderrs[$i]); | |
$status = proc_close($this->processes[$i]); | |
unset($this->workers[$i]); | |
unset($this->processes[$i]); | |
unset($this->stdins[$i]); | |
unset($this->stdouts[$i]); | |
unset($this->stderrs[$i]); | |
return $status; | |
} | |
public function count() | |
{ | |
return count($this->workers); | |
} | |
public function __destruct() | |
{ | |
array_walk($this->stdins, 'fclose'); | |
array_walk($this->stdouts, 'fclose'); | |
array_walk($this->stderrs, 'fclose'); | |
array_walk($this->processes, 'proc_close'); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment