Last active
February 18, 2021 20:26
-
-
Save wodka/23475d36cf13e956b8db7578bf6251ed to your computer and use it in GitHub Desktop.
Simple Symfony3 command Thread Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace AppBundle\Command; | |
use Symfony\Component\Console\Command\Command; | |
use Symfony\Component\Console\Input\InputArgument; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Input\InputOption; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\Process\Process; | |
/** | |
* basic thread pool to execute a fixed amount of worker commands | |
*/ | |
class ThreadPoolCommand extends Command | |
{ | |
const INPUT_THREADS = 'threads'; | |
const INPUT_COMMAND = 'exec'; | |
const TICK = 500000; | |
const MAX_THREADS = 20; | |
/** | |
* setup help | |
*/ | |
protected function configure() | |
{ | |
$this | |
->setName('thread:pool') | |
->addOption(self::INPUT_THREADS, 't', InputOption::VALUE_OPTIONAL, 'number of threads to work (default is one)', 1) | |
->addArgument(self::INPUT_COMMAND, InputArgument::REQUIRED, 'command to run (to include parameters add quotes)') | |
; | |
} | |
/** | |
* execute pool | |
* | |
* @param InputInterface $input | |
* @param OutputInterface $output | |
* @return int | |
*/ | |
public function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$threads = $input->getOption(self::INPUT_THREADS); | |
$command = $this->buildCommand($input); | |
if (empty($threads) || $threads < 1 || $threads > self::MAX_THREADS) { | |
$output->writeln('pool[master] <error>invalid thread count</error>'); | |
return 1; | |
} | |
/** @var Process[] $pool */ | |
$pool = array_fill(0, $threads, null); | |
$logger = $this->buildLoggers($threads); | |
$output->writeln('pool[master] start with <info>'.$threads.'</info> workers'); | |
$process = true; | |
pcntl_signal(SIGTERM, function($signo, $siginfo) use ($output, &$process) { | |
$output->writeln('pool[master] received SIGTERM - stop threads'); | |
$process = false; | |
}); | |
while ($process) { | |
pcntl_signal_dispatch(); | |
for ($i = 0; $i < $threads; $i ++) { | |
if ($pool[$i] instanceof Process && ($pool[$i]->isRunning() || !$pool[$i]->isStarted())) { | |
if (!$process) { | |
$pool[$i]->signal(SIGTERM); | |
} | |
continue; | |
} | |
if (!$process) { | |
continue; | |
} | |
if ($pool[$i] instanceof Process) { | |
$output->writeln(sprintf( | |
'pool[%d] %s failed', | |
$i, | |
implode(' ', $command) | |
)); | |
} | |
$output->writeln(sprintf( | |
'pool[%d] %s started', | |
$i, | |
implode(' ', $command) | |
)); | |
$pool[$i] = new Process(implode(' ', $command)); | |
// stop output caching and stream it to strerr and stdout | |
$pool[$i]->disableOutput(); | |
$pool[$i]->start($logger[$i]); | |
} | |
usleep(self::TICK); | |
} | |
$output->writeln('pool[master] <error>error with pool</error>'); | |
return 0; | |
} | |
/** | |
* build prefixed loggers | |
* @param int $threads | |
* | |
* @return callable[] | |
*/ | |
private function buildLoggers($threads) | |
{ | |
$logger = []; | |
for ($i = 0; $i < $threads; $i++) { | |
$prefix = 'pool['.$i.'] '; | |
$method = <<<PHP | |
if (\Symfony\Component\Process\Process::ERR === \$type) { | |
fwrite(STDERR, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL); | |
} else { | |
fwrite(STDOUT, '$prefix'.str_replace(PHP_EOL, PHP_EOL.'$prefix', trim(\$buffer)).PHP_EOL); | |
} | |
PHP; | |
$logger[] = create_function('$type, $buffer', $method); | |
} | |
return $logger; | |
} | |
/** | |
* build execution path, also append custom php ini for heroku | |
* | |
* @param InputInterface $input | |
* @return array | |
*/ | |
private function buildCommand(InputInterface $input) | |
{ | |
$exec = [ | |
'php' | |
]; | |
if ($path = php_ini_loaded_file()) { | |
$exec[] = '--php-ini '.$path; | |
} | |
$exec[] = 'bin/console'; | |
$exec[] = $input->getArgument(self::INPUT_COMMAND); | |
return $exec; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment