Created
February 25, 2026 16:02
-
-
Save masakielastic/4ae7c885c40ba35852147223e4fede58 to your computer and use it in GitHub Desktop.
HTTPS対応版:Fiberスケジューラ + https_get_async()
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| /** | |
| * Fiber協調スケジューラ(sleep + readable + writable) | |
| */ | |
| final class Scheduler | |
| { | |
| /** @var SplQueue<array{0:Fiber,1:mixed}> */ | |
| private SplQueue $ready; | |
| /** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */ | |
| private array $timers = []; | |
| /** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */ | |
| private array $readWait = []; | |
| /** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */ | |
| private array $writeWait = []; | |
| public function __construct() | |
| { | |
| $this->ready = new SplQueue(); | |
| } | |
| public function newTask(callable $fn): void | |
| { | |
| $fiber = new Fiber(fn() => $fn()); | |
| $this->scheduleReady($fiber, null); | |
| } | |
| private function scheduleReady(Fiber $fiber, mixed $value = null): void | |
| { | |
| $this->ready->enqueue([$fiber, $value]); | |
| } | |
| private function addWait(array &$bucket, Fiber $fiber, mixed $stream): void | |
| { | |
| if (!is_resource($stream)) { | |
| $this->scheduleReady($fiber, null); | |
| return; | |
| } | |
| $id = (int)$stream; | |
| $bucket[$id][] = ['fiber' => $fiber, 'stream' => $stream]; | |
| } | |
| private function handleRequest(Fiber $fiber, mixed $request): void | |
| { | |
| if (!is_array($request) || ($request[0] ?? null) === null) { | |
| $this->scheduleReady($fiber, null); | |
| return; | |
| } | |
| $op = $request[0]; | |
| if ($op === 'sleep') { | |
| $seconds = (float)($request[1] ?? 0.0); | |
| $wakeAt = microtime(true) + max(0.0, $seconds); | |
| $this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $seconds]; | |
| return; | |
| } | |
| if ($op === 'readable') { | |
| $this->addWait($this->readWait, $fiber, $request[1] ?? null); | |
| return; | |
| } | |
| if ($op === 'writable') { | |
| $this->addWait($this->writeWait, $fiber, $request[1] ?? null); | |
| return; | |
| } | |
| $this->scheduleReady($fiber, null); | |
| } | |
| private function pumpTimers(): void | |
| { | |
| if (empty($this->timers)) return; | |
| $now = microtime(true); | |
| $still = []; | |
| foreach ($this->timers as $t) { | |
| if ($t['time'] <= $now) { | |
| $this->scheduleReady($t['fiber'], $t['value']); | |
| } else { | |
| $still[] = $t; | |
| } | |
| } | |
| $this->timers = $still; | |
| } | |
| private function nextTimeoutUs(int $maxUs): int | |
| { | |
| if (empty($this->timers)) return $maxUs; | |
| $next = min(array_column($this->timers, 'time')); | |
| $delta = $next - microtime(true); | |
| if ($delta <= 0) return 0; | |
| $us = (int)($delta * 1_000_000); | |
| return min($us, $maxUs); | |
| } | |
| private function pumpSelect(int $timeoutUs): void | |
| { | |
| $read = []; | |
| foreach ($this->readWait as $id => $entries) { | |
| $read[] = $entries[0]['stream']; | |
| } | |
| $write = []; | |
| foreach ($this->writeWait as $id => $entries) { | |
| $write[] = $entries[0]['stream']; | |
| } | |
| if (empty($read) && empty($write)) { | |
| usleep($timeoutUs); | |
| return; | |
| } | |
| $except = null; | |
| $sec = intdiv($timeoutUs, 1_000_000); | |
| $usec = $timeoutUs % 1_000_000; | |
| $r = $read; | |
| $w = $write; | |
| $n = @stream_select($r, $w, $except, $sec, $usec); | |
| if ($n === false || $n === 0) return; | |
| foreach ($r as $stream) { | |
| $id = (int)$stream; | |
| if (!isset($this->readWait[$id])) continue; | |
| $entries = $this->readWait[$id]; | |
| unset($this->readWait[$id]); | |
| foreach ($entries as $e) { | |
| $this->scheduleReady($e['fiber'], $stream); | |
| } | |
| } | |
| foreach ($w as $stream) { | |
| $id = (int)$stream; | |
| if (!isset($this->writeWait[$id])) continue; | |
| $entries = $this->writeWait[$id]; | |
| unset($this->writeWait[$id]); | |
| foreach ($entries as $e) { | |
| $this->scheduleReady($e['fiber'], $stream); | |
| } | |
| } | |
| } | |
| public function run(): void | |
| { | |
| while ( | |
| !$this->ready->isEmpty() | |
| || !empty($this->timers) | |
| || !empty($this->readWait) | |
| || !empty($this->writeWait) | |
| ) { | |
| $this->pumpTimers(); | |
| if (!$this->ready->isEmpty()) { | |
| /** @var array{0:Fiber,1:mixed} $item */ | |
| $item = $this->ready->dequeue(); | |
| [$fiber, $value] = $item; | |
| try { | |
| if (!$fiber->isStarted()) { | |
| $request = $fiber->start(); | |
| } else { | |
| $request = $fiber->resume($value); | |
| } | |
| } catch (Throwable $e) { | |
| fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL); | |
| continue; | |
| } | |
| if (!$fiber->isTerminated()) { | |
| $this->handleRequest($fiber, $request); | |
| } | |
| continue; | |
| } | |
| $timeoutUs = $this->nextTimeoutUs(50_000); | |
| $this->pumpSelect($timeoutUs); | |
| } | |
| } | |
| } | |
| /** await:要求を出し、resume(value) の value を受け取る */ | |
| function await(array $request): mixed | |
| { | |
| return Fiber::suspend($request); | |
| } | |
| function sleep_async(float $seconds): float | |
| { | |
| /** @var float $slept */ | |
| $slept = await(['sleep', $seconds]); | |
| return $slept; | |
| } | |
| function wait_readable($stream): void | |
| { | |
| await(['readable', $stream]); | |
| } | |
| function wait_writable($stream): void | |
| { | |
| await(['writable', $stream]); | |
| } | |
| /** | |
| * non-blocking TCP connect(context対応) | |
| */ | |
| function tcp_connect_async(string $host, int $port, float $timeoutSec = 5.0, $context = null) | |
| { | |
| $addr = "tcp://{$host}:{$port}"; | |
| $errno = 0; | |
| $errstr = ''; | |
| $flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT; | |
| $sock = @stream_socket_client($addr, $errno, $errstr, $timeoutSec, $flags, $context); | |
| if ($sock === false) { | |
| throw new RuntimeException("connect failed: {$errstr} ({$errno})"); | |
| } | |
| stream_set_blocking($sock, false); | |
| // writable になるまで待つ(接続確立 or 失敗) | |
| wait_writable($sock); | |
| return $sock; | |
| } | |
| function write_all_async($sock, string $data): void | |
| { | |
| $offset = 0; | |
| $len = strlen($data); | |
| while ($offset < $len) { | |
| wait_writable($sock); | |
| $n = @fwrite($sock, substr($data, $offset)); | |
| if ($n === false) { | |
| throw new RuntimeException("write failed"); | |
| } | |
| if ($n === 0) { | |
| sleep_async(0.01); | |
| continue; | |
| } | |
| $offset += $n; | |
| } | |
| } | |
| function read_all_async($sock, int $maxBytes = 2_000_000): string | |
| { | |
| $buf = ''; | |
| while (!feof($sock)) { | |
| wait_readable($sock); | |
| $chunk = @fread($sock, 8192); | |
| if ($chunk === false) { | |
| throw new RuntimeException("read failed"); | |
| } | |
| if ($chunk === '') { | |
| sleep_async(0.01); | |
| continue; | |
| } | |
| $buf .= $chunk; | |
| if (strlen($buf) > $maxBytes) { | |
| throw new RuntimeException("response too large"); | |
| } | |
| } | |
| return $buf; | |
| } | |
| /** | |
| * TLSハンドシェイク(non-blocking) | |
| * stream_socket_enable_crypto() が 0 を返す間は read/write 待ちが必要。 | |
| */ | |
| function tls_handshake_async($sock, string $host, float $timeoutSec = 10.0): void | |
| { | |
| $deadline = microtime(true) + $timeoutSec; | |
| // クライアントTLS(環境により定数の対応が違うので @ で抑制せず、ここは固定で) | |
| $method = STREAM_CRYPTO_METHOD_TLS_CLIENT; | |
| // 最小の戦略:enable_crypto を叩き、0なら readable/writable を交互に待ってリトライ | |
| $turn = 0; | |
| while (true) { | |
| $ok = @stream_socket_enable_crypto($sock, true, $method); | |
| if ($ok === true) { | |
| return; // 完了 | |
| } | |
| if ($ok === false) { | |
| $meta = stream_get_meta_data($sock); | |
| $timedOut = $meta['timed_out'] ?? false; | |
| throw new RuntimeException("TLS handshake failed" . ($timedOut ? " (timed out)" : "")); | |
| } | |
| // $ok === 0 : まだ途中 | |
| if (microtime(true) >= $deadline) { | |
| throw new RuntimeException("TLS handshake timed out"); | |
| } | |
| // どっち待ちが必要かは環境差があるので、read/write を交互に待つ(最小版) | |
| if (($turn++ % 2) === 0) { | |
| wait_readable($sock); | |
| } else { | |
| wait_writable($sock); | |
| } | |
| } | |
| } | |
| /** | |
| * HTTPS GET(HTTP/1.1 + Connection: close) | |
| * - 証明書検証を有効にしたいので peer_name / verify_peer_name を設定 | |
| * - 検証が邪魔な環境用に $insecure を用意 | |
| */ | |
| function https_get_async(string $host, string $path, bool $insecure = false): string | |
| { | |
| $ssl = [ | |
| 'SNI_enabled' => true, | |
| 'peer_name' => $host, | |
| // ふつうは true 推奨 | |
| 'verify_peer' => !$insecure, | |
| 'verify_peer_name' => !$insecure, | |
| // insecure時は自己署名等を許す(デモ用) | |
| 'allow_self_signed' => $insecure, | |
| ]; | |
| $ctx = stream_context_create(['ssl' => $ssl]); | |
| $sock = tcp_connect_async($host, 443, 5.0, $ctx); | |
| // TLSを開始(この時点で SNI/peer_name が効く) | |
| tls_handshake_async($sock, $host, 10.0); | |
| $req = | |
| "GET {$path} HTTP/1.1\r\n" . | |
| "Host: {$host}\r\n" . | |
| "User-Agent: fiber-mini-https/0.1\r\n" . | |
| "Accept: */*\r\n" . | |
| "Connection: close\r\n" . | |
| "\r\n"; | |
| write_all_async($sock, $req); | |
| $raw = read_all_async($sock); | |
| fclose($sock); | |
| return $raw; | |
| } | |
| /* ======================= | |
| * デモ:tickしつつHTTPS GET | |
| * ======================= */ | |
| $s = new Scheduler(); | |
| // 背景tick | |
| $s->newTask(function () { | |
| for ($i = 1; $i <= 50; $i++) { | |
| sleep_async(0.1); | |
| echo "[TICK] {$i}\n"; | |
| } | |
| }); | |
| // HTTPS GET | |
| $s->newTask(function () { | |
| $raw = https_get_async("httpbin.org", "/get", false); | |
| [$head, $body] = explode("\r\n\r\n", $raw, 2) + [null, null]; | |
| echo "----- RESPONSE HEAD -----\n"; | |
| echo $head . "\n"; | |
| echo "----- RESPONSE BODY (first 400 bytes) -----\n"; | |
| echo substr($body ?? '', 0, 400) . "\n"; | |
| }); | |
| $s->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment