Created
February 16, 2017 14:23
-
-
Save nicholasnet/246a62c4c26593d73511b5f21e96d52e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
/** | |
* Copyright (c) nicholasnet | |
*/ | |
namespace Ideasbucket; | |
use Symfony\Component\Process\Process; | |
/** | |
* Class ProcessManager | |
* | |
* @package Ideasbucket | |
*/ | |
class ProcessManager | |
{ | |
/** | |
* @var int | |
*/ | |
private $concurrency = 20; | |
/** | |
* @var array | |
*/ | |
private $processes = []; | |
/** | |
* @var \ArrayIterator | |
*/ | |
private $inProcess; | |
/** | |
* @var Callable | |
*/ | |
private $onSuccess; | |
/** | |
* @var Callable | |
*/ | |
private $onError; | |
/** | |
* @var bool | |
*/ | |
private $ran = false; | |
/** | |
* ProcessManager constructor. | |
* | |
* @param mixed $processes | |
* @param array $opts | |
*/ | |
public function __construct($processes, array $opts) | |
{ | |
$this->processes = $this->getIterator($processes); | |
$this->onSuccess = (isset($opts['success']) && is_callable($opts['success'])) ? $opts['success'] : $this->getNoOp(); | |
$this->onError = (isset($opts['error']) && is_callable($opts['error'])) ? $opts['error'] : $this->getNoOp(); | |
} | |
/** | |
* Returns an iterator for the given value. | |
* | |
* @param mixed $value | |
* | |
* @return \Iterator | |
*/ | |
private function getIterator($value) | |
{ | |
if ($value instanceof \Iterator) { | |
return $value; | |
} | |
if (is_array($value)) { | |
return new \ArrayIterator($value); | |
} | |
return new \ArrayIterator([$value]); | |
} | |
/** | |
* @return \Closure | |
*/ | |
private function getNoOp() : \Closure | |
{ | |
return function($response, $index) {}; | |
} | |
/** | |
* @param int $concurrency | |
* | |
* @return ProcessManager | |
* | |
* @throws \LogicException | |
*/ | |
public function setConcurrency(int $concurrency): ProcessManager | |
{ | |
if ($this->ran === true) { | |
throw new \LogicException('Cannot change concurrency after process run.'); | |
} | |
if ($concurrency <= 0) { | |
throw new \InvalidArgumentException('Concurrency cannot be 0 or negative value.'); | |
} | |
$this->concurrency = $concurrency; | |
return $this; | |
} | |
/** | |
* Executes the processes. | |
*/ | |
public function run() | |
{ | |
$this->addPending(); | |
$this->ran = true; | |
while (count($this->inProcess) !== 0) { | |
$this->monitorPending(); | |
} | |
// Clear the references for callbacks. | |
$this->onSuccess = null; | |
$this->onError = null; | |
} | |
/** | |
* Adds the pending | |
*/ | |
private function addPending() | |
{ | |
while (count($this->inProcess) < $this->concurrency) { | |
$item = $this->getUnprocessed(); | |
if (empty($item)) { | |
break; | |
} | |
$this->inProcess[$item['index']] = $item['process']; | |
$item['process']->start(); | |
if (count($this->inProcess) >= $this->concurrency) { | |
break; | |
} | |
} | |
} | |
/** | |
* @return array|null | |
*/ | |
private function getUnprocessed() | |
{ | |
if (!$this->processes || !$this->processes->valid()) { | |
return null; | |
} | |
$currentProcess = $this->processes->current(); | |
if (($currentProcess instanceof Process) === false) { | |
$this->processes->next(); | |
return $this->getUnprocessed(); | |
} | |
$data = ['index' => $this->processes->key(), 'process' => $currentProcess]; | |
$this->processes->next(); | |
return $data; | |
} | |
/** | |
* Monitors the pending process. | |
*/ | |
private function monitorPending() | |
{ | |
foreach ($this->inProcess as $index => $process) { | |
/** @var Process $process */ | |
if ($process->isTerminated()) { | |
if ($process->isSuccessful()) { | |
call_user_func($this->onSuccess, $process->getOutput(), $index); | |
} else { | |
call_user_func($this->onError, $process->getErrorOutput(), $index); | |
} | |
unset($this->inProcess[$index]); | |
$this->addPending(); | |
} | |
} | |
} | |
} | |
// Example Usage | |
$requests = function ($total) { | |
for ($i = 0; $i < $total; $i++) { | |
yield new Process('ls -lsa'); | |
} | |
}; | |
(new \Ideasbucket\ProcessManager($requests(100), [ | |
'error' => function ($response, $index) {}, | |
'success' => function ($response, $index) {} | |
]))->run(); | |
// OR | |
$processes = []; | |
for ($i = 0; $i < 100; $i++) { | |
$process[] = new Process('ls -lsa'); | |
} | |
(new \Ideasbucket\ProcessManager($processes, [ | |
'error' => function ($response, $index) {}, | |
'success' => function ($response, $index) {} | |
]))->setConcurrency(10)->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment