Skip to content

Instantly share code, notes, and snippets.

@thekid
Last active December 16, 2021 13:34
Show Gist options
  • Select an option

  • Save thekid/0b20593c563fea11def89f861aa2000f to your computer and use it in GitHub Desktop.

Select an option

Save thekid/0b20593c563fea11def89f861aa2000f to your computer and use it in GitHub Desktop.
Await with Fibers
<?php
use io\OperationTimedOutException;
use peer\{Socket, Sockets, ConnectException};
use util\Objects;
use util\profiling\Timer;
// Library code
function logger(... $args) {
printf("[%s] %s\n", (new DateTime())->format('H:i:s.v'), implode('', array_map(
fn($arg) => is_string($arg) ? $arg : Objects::stringOf($arg),
$args
)));
}
function fetch($url) {
$t= (new Timer())->start();
$u= parse_url($url);
logger('⚡ CONNECT @', $u['host']);
$socket= new Socket($u['host'], $u['port'] ?? 80);
$socket->open();
Fiber::suspend($socket);
// Send HTTP request
Fiber::suspend('write');
$request= "HEAD ".($u['path'] ?? '/')." HTTP/1.0\r\nHost: {$u['host']}\r\nConnection: close\r\n\r\n";
logger('▶️ `', addcslashes($request, "\0..\37!\177..\377"), '` @', $u['host']);
$socket->write($request);
// Read response
$response= '';
do {
Fiber::suspend('read');
$chunk= $socket->readBinary();
logger('◀️ `', addcslashes($chunk, "\0..\37!\177..\377"), '` @', $u['host']);
$response.= $chunk;
} while (!$socket->eof());
$socket->close();
$t->stop();
logger('🔥 FINISH ', $u['host'], ' after ', sprintf('%.3f seconds', $t->elapsedTime()));
return $response;
}
const OPERATIONS= [
'connect' => ['write', 2.0],
'read' => ['read', 4.0],
'write' => ['write', 4.0],
'close' => ['read', 2.0],
];
function await($awaitable) {
$fibers= is_array($awaitable) ? $awaitable : [$awaitable];
// Start fibers
$sockets= $intents= [];
foreach ($fibers as $id => $fiber) {
$sockets[$id]= $fiber->start();
$intents[$id]= ['connect', microtime(true) + OPERATIONS['connect'][1]];
}
// Continue execution until they have all completed
$error= null;
while ($fibers) {
$wait= 30.0;
$now= microtime(true);
$select= ['read' => [], 'write' => []];
foreach ($intents as $id => $intent) {
if ($fibers[$id]->isTerminated()) {
yield $id => $fibers[$id]->getReturn();
unset($fibers[$id], $sockets[$id], $intents[$id]);
} else {
$select[OPERATIONS[$intent[0]][0]][$id]= $sockets[$id];
}
$wait= min($wait, $intent[1] - $now);
}
if ($select['read'] || $select['write']) {
Sockets::$STREAM->select($select['read'], $select['write'], $error, $wait);
}
// Handle timeouts
$now= microtime(true);
foreach ($intents as $id => $intent) {
if ($intent[1] - $now <= 0.0) {
try {
$fibers[$id]->throw(new OperationTimedOutException(sprintf(
'%s timeout after %.3f seconds',
ucfirst($intent[0]),
OPERATIONS[$intent[0]][1]
)));
} catch (Throwable $e) {
logger('🛑 ', $e);
unset($fibers[$id], $sockets[$id], $intents[$id]);
continue 2;
}
}
if ('connect' === $intent[0] && !$sockets[$id]->isConnected()) {
unset($select['read'][$id], $select['write'][$id]);
}
}
// Handle I/O
foreach ($select['read'] + $select['write'] as $id => $select) {
try {
$intent= $fibers[$id]->resume() ?? 'close';
$intents[$id]= [$intent, microtime(true) + OPERATIONS[$intent][1]];
} catch (Throwable $e) {
logger('🛑 ', $e);
unset($fibers[$id], $sockets[$id], $intents[$id]);
}
}
}
}
// Main code
$requests= [];
foreach (array_slice($argv, 1) as $url) {
$requests[$url]= new Fiber(fn() => fetch($url));
}
logger('START');
$t= (new Timer())->start();
foreach (await($requests) as $url => $response) {
echo "\e[37;4m", $url, "\e[0m\n\e[34m", $response, "\e[0m";
}
$t->stop();
logger('STOP');
printf("%d URLs fetched in %.3f seconds\n", $argc - 1, $t->elapsedTime());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment