Created
December 5, 2019 22:46
-
-
Save Nek-/ae895c30e7da22b0d8bc3e17baf25fc6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// Utilisation d'une classe Task pour clarifier mon code précédent à base de tableaux | |
class Task | |
{ | |
protected $taskId; | |
protected $coroutine; | |
protected $sendValue = null; | |
protected $beforeFirstYield = true; | |
public function __construct($taskId, Generator $coroutine) | |
{ | |
$this->taskId = $taskId; | |
$this->coroutine = $coroutine; | |
} | |
public function getTaskId() | |
{ | |
return $this->taskId; | |
} | |
public function setSendValue($sendValue) | |
{ | |
$this->sendValue = $sendValue; | |
} | |
public function run() | |
{ | |
if ($this->beforeFirstYield) { | |
$this->beforeFirstYield = false; | |
return $this->coroutine->current(); | |
} else { | |
$retval = $this->coroutine->send($this->sendValue); | |
$this->sendValue = null; | |
return $retval; | |
} | |
} | |
public function isFinished() | |
{ | |
return !$this->coroutine->valid(); | |
} | |
} | |
// On retrouve notre scheduler qui embarque toutes les tasks | |
class Scheduler | |
{ | |
protected $maxTaskId = 0; | |
protected $taskMap = []; // taskId => task | |
protected $taskQueue; | |
// Tableaux indexé par l'id des streams et contenant les tasks à exécuter pour chacun | |
protected $waitingForRead = []; | |
protected $waitingForWrite = []; | |
public function __construct() | |
{ | |
$this->taskQueue = new SplQueue(); | |
} | |
public function newTask(Generator $coroutine) | |
{ | |
$tid = ++$this->maxTaskId; | |
$task = new Task($tid, $coroutine); | |
$this->taskMap[$tid] = $task; | |
$this->schedule($task); | |
return $tid; | |
} | |
public function schedule(Task $task) | |
{ | |
$this->taskQueue->enqueue($task); | |
} | |
public function run() | |
{ | |
// Task spéciale, elle s'occupe des temps d'attente avec la fameuse fonction select_stream | |
$this->newTask($this->ioPollTask()); | |
while (!$this->taskQueue->isEmpty()) { | |
$task = $this->taskQueue->dequeue(); | |
$retval = $task->run(); | |
// L'utilisation de la classe SystemCall (définie plus bas) permet d'avoir un retour standard | |
// de nos générateurs et surtout, leur permet de communiquer avec le scheduler | |
// on va pourvoir ajouter une autre task depuis une task, ou attendre un stream etc... | |
if ($retval instanceof SystemCall) { | |
$retval($task, $this); | |
continue; | |
} | |
if ($task->isFinished()) { | |
unset($this->taskMap[$task->getTaskId()]); | |
} else { | |
$this->schedule($task); | |
} | |
} | |
} | |
public function waitForRead($socket, Task $task) | |
{ | |
if (isset($this->waitingForRead[(int) $socket])) { | |
$this->waitingForRead[(int) $socket][1][] = $task; | |
} else { | |
$this->waitingForRead[(int) $socket] = [$socket, [$task]]; | |
} | |
} | |
public function waitForWrite($socket, Task $task) | |
{ | |
if (isset($this->waitingForWrite[(int) $socket])) { | |
$this->waitingForWrite[(int) $socket][1][] = $task; | |
} else { | |
$this->waitingForWrite[(int) $socket] = [$socket, [$task]]; | |
} | |
} | |
// Fonction qui permet l'attente et re-schedule les tasks qui ont eu une activité de stream ! | |
protected function ioPoll($timeout) | |
{ | |
$rSocks = []; | |
foreach ($this->waitingForRead as list($socket)) { | |
$rSocks[] = $socket; | |
} | |
$wSocks = []; | |
foreach ($this->waitingForWrite as list($socket)) { | |
$wSocks[] = $socket; | |
} | |
$eSocks = []; // dummy | |
if (!@stream_select($rSocks, $wSocks, $eSocks, $timeout)) { | |
return; | |
} | |
foreach ($rSocks as $socket) { | |
list(, $tasks) = $this->waitingForRead[(int) $socket]; | |
unset($this->waitingForRead[(int) $socket]); | |
foreach ($tasks as $task) { | |
$this->schedule($task); | |
} | |
} | |
foreach ($wSocks as $socket) { | |
list(, $tasks) = $this->waitingForWrite[(int) $socket]; | |
unset($this->waitingForWrite[(int) $socket]); | |
foreach ($tasks as $task) { | |
$this->schedule($task); | |
} | |
} | |
} | |
protected function ioPollTask() | |
{ | |
while (true) { | |
if ($this->taskQueue->isEmpty()) { | |
$this->ioPoll(null); | |
} else { | |
$this->ioPoll(0); | |
} | |
yield; | |
} | |
} | |
} | |
// Classe proxy qui permet à la task et au scheduler de communiquer | |
class SystemCall | |
{ | |
protected $callback; | |
public function __construct(callable $callback) { | |
$this->callback = $callback; | |
} | |
public function __invoke(Task $task, Scheduler $scheduler) { | |
$callback = $this->callback; // Can't call it directly in PHP :/ | |
return $callback($task, $scheduler); | |
} | |
} | |
function getTaskId() { | |
return new SystemCall(function(Task $task, Scheduler $scheduler) { | |
$task->setSendValue($task->getTaskId()); | |
$scheduler->schedule($task); | |
}); | |
} | |
function newTask(Generator $coroutine) { | |
return new SystemCall( | |
function(Task $task, Scheduler $scheduler) use ($coroutine) { | |
$task->setSendValue($scheduler->newTask($coroutine)); | |
$scheduler->schedule($task); | |
} | |
); | |
} | |
function waitForRead($socket) { | |
return new SystemCall( | |
function(Task $task, Scheduler $scheduler) use ($socket) { | |
$scheduler->waitForRead($socket, $task); | |
} | |
); | |
} | |
function waitForWrite($socket) { | |
return new SystemCall( | |
function(Task $task, Scheduler $scheduler) use ($socket) { | |
$scheduler->waitForWrite($socket, $task); | |
} | |
); | |
} | |
// Nous arrivons à la partie "simple" et le coeur de ce que nous voulons faire: le serveur http. | |
// Cette fois ci nous utilisons un ensemble de classes relativement complexe pour arriver à nos fins | |
// mais c'est plus élégant à l'utilisation. Voyez vous mêmes. | |
function server($port) { | |
echo "Starting server at port $port...\n"; | |
$socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr); | |
if (!$socket) throw new Exception($errStr, $errNo); | |
stream_set_blocking($socket, 0); | |
while (true) { | |
yield waitForRead($socket); | |
$clientSocket = stream_socket_accept($socket, 0); | |
yield newTask(handleClient($clientSocket)); | |
} | |
} | |
function handleClient($socket) { | |
yield waitForRead($socket); | |
$data = fread($socket, 8192); | |
$msg = "<h1>Reponse asynchrone</h1>\n" . time(); | |
$response = "HTTP/1.1 200 OK\r\n" . | |
"Date: " . gmdate('D, d M Y H:i:s T') . "\r\n" . | |
"Connection: close\r\n" . | |
"Content-Type: text/html\r\n" . | |
"Content-Length: ". strlen($msg). "\r\n" . | |
"\r\n" . | |
$msg . "\r\n"; | |
yield waitForWrite($socket); | |
fwrite($socket, $response); | |
fclose($socket); | |
} | |
$scheduler = new Scheduler; | |
$scheduler->newTask(server(8000)); | |
$scheduler->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment