Last active
December 16, 2021 13:34
-
-
Save thekid/0b20593c563fea11def89f861aa2000f to your computer and use it in GitHub Desktop.
Await with Fibers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| use io\OperationTimedOutException; | |
| use peer\{Socket, Sockets, ConnectException}; | |
| use util\Objects; | |
| use util\profiling\Timer; | |
| // Library code | |
| function logger(... $args) { | |
| printf("[%s] %s\n", (new DateTime())->format('H:i:s.v'), implode('', array_map( | |
| fn($arg) => is_string($arg) ? $arg : Objects::stringOf($arg), | |
| $args | |
| ))); | |
| } | |
| function fetch($url) { | |
| $t= (new Timer())->start(); | |
| $u= parse_url($url); | |
| logger('⚡ CONNECT @', $u['host']); | |
| $socket= new Socket($u['host'], $u['port'] ?? 80); | |
| $socket->open(); | |
| Fiber::suspend($socket); | |
| // Send HTTP request | |
| Fiber::suspend('write'); | |
| $request= "HEAD ".($u['path'] ?? '/')." HTTP/1.0\r\nHost: {$u['host']}\r\nConnection: close\r\n\r\n"; | |
| logger('▶️ `', addcslashes($request, "\0..\37!\177..\377"), '` @', $u['host']); | |
| $socket->write($request); | |
| // Read response | |
| $response= ''; | |
| do { | |
| Fiber::suspend('read'); | |
| $chunk= $socket->readBinary(); | |
| logger('◀️ `', addcslashes($chunk, "\0..\37!\177..\377"), '` @', $u['host']); | |
| $response.= $chunk; | |
| } while (!$socket->eof()); | |
| $socket->close(); | |
| $t->stop(); | |
| logger('🔥 FINISH ', $u['host'], ' after ', sprintf('%.3f seconds', $t->elapsedTime())); | |
| return $response; | |
| } | |
| const OPERATIONS= [ | |
| 'connect' => ['write', 2.0], | |
| 'read' => ['read', 4.0], | |
| 'write' => ['write', 4.0], | |
| 'close' => ['read', 2.0], | |
| ]; | |
| function await($awaitable) { | |
| $fibers= is_array($awaitable) ? $awaitable : [$awaitable]; | |
| // Start fibers | |
| $sockets= $intents= []; | |
| foreach ($fibers as $id => $fiber) { | |
| $sockets[$id]= $fiber->start(); | |
| $intents[$id]= ['connect', microtime(true) + OPERATIONS['connect'][1]]; | |
| } | |
| // Continue execution until they have all completed | |
| $error= null; | |
| while ($fibers) { | |
| $wait= 30.0; | |
| $now= microtime(true); | |
| $select= ['read' => [], 'write' => []]; | |
| foreach ($intents as $id => $intent) { | |
| if ($fibers[$id]->isTerminated()) { | |
| yield $id => $fibers[$id]->getReturn(); | |
| unset($fibers[$id], $sockets[$id], $intents[$id]); | |
| } else { | |
| $select[OPERATIONS[$intent[0]][0]][$id]= $sockets[$id]; | |
| } | |
| $wait= min($wait, $intent[1] - $now); | |
| } | |
| if ($select['read'] || $select['write']) { | |
| Sockets::$STREAM->select($select['read'], $select['write'], $error, $wait); | |
| } | |
| // Handle timeouts | |
| $now= microtime(true); | |
| foreach ($intents as $id => $intent) { | |
| if ($intent[1] - $now <= 0.0) { | |
| try { | |
| $fibers[$id]->throw(new OperationTimedOutException(sprintf( | |
| '%s timeout after %.3f seconds', | |
| ucfirst($intent[0]), | |
| OPERATIONS[$intent[0]][1] | |
| ))); | |
| } catch (Throwable $e) { | |
| logger('🛑 ', $e); | |
| unset($fibers[$id], $sockets[$id], $intents[$id]); | |
| continue 2; | |
| } | |
| } | |
| if ('connect' === $intent[0] && !$sockets[$id]->isConnected()) { | |
| unset($select['read'][$id], $select['write'][$id]); | |
| } | |
| } | |
| // Handle I/O | |
| foreach ($select['read'] + $select['write'] as $id => $select) { | |
| try { | |
| $intent= $fibers[$id]->resume() ?? 'close'; | |
| $intents[$id]= [$intent, microtime(true) + OPERATIONS[$intent][1]]; | |
| } catch (Throwable $e) { | |
| logger('🛑 ', $e); | |
| unset($fibers[$id], $sockets[$id], $intents[$id]); | |
| } | |
| } | |
| } | |
| } | |
| // Main code | |
| $requests= []; | |
| foreach (array_slice($argv, 1) as $url) { | |
| $requests[$url]= new Fiber(fn() => fetch($url)); | |
| } | |
| logger('START'); | |
| $t= (new Timer())->start(); | |
| foreach (await($requests) as $url => $response) { | |
| echo "\e[37;4m", $url, "\e[0m\n\e[34m", $response, "\e[0m"; | |
| } | |
| $t->stop(); | |
| logger('STOP'); | |
| printf("%d URLs fetched in %.3f seconds\n", $argc - 1, $t->elapsedTime()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment