Created
February 25, 2026 15:53
-
-
Save masakielastic/1966a03b304e3104eddf7f13a4c90393 to your computer and use it in GitHub Desktop.
Fiber協調スケジューラ。入力待ち
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| /** | |
| * Fiber協調スケジューラ(sleep + stream_select readable 対応) | |
| * - await(['sleep', seconds]) → 指定時刻に resume(value) | |
| * - await(['readable', stream]) → stream が readable になったら resume(stream) | |
| */ | |
| final class Scheduler | |
| { | |
| /** @var SplQueue<array{0:Fiber,1:mixed}> */ | |
| private SplQueue $ready; | |
| /** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */ | |
| private array $timers = []; | |
| /** | |
| * readable待ち: | |
| * streamId(int) => array<int, array{fiber: Fiber, stream: resource}> | |
| * ※ 同一ストリームに複数Fiberをぶら下げられるように配列 | |
| * @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> | |
| */ | |
| private array $readWait = []; | |
| public function __construct() | |
| { | |
| $this->ready = new SplQueue(); | |
| } | |
| public function newTask(callable $fn): void | |
| { | |
| $fiber = new Fiber(fn() => $fn()); | |
| $this->scheduleReady($fiber, null); | |
| } | |
| private function scheduleReady(Fiber $fiber, mixed $value = null): void | |
| { | |
| $this->ready->enqueue([$fiber, $value]); | |
| } | |
| private function addReadableWait(Fiber $fiber, mixed $stream): void | |
| { | |
| if (!is_resource($stream)) { | |
| // 例外にしても良いが、最小実装として即再開 | |
| $this->scheduleReady($fiber, null); | |
| return; | |
| } | |
| $id = (int)$stream; | |
| $this->readWait[$id][] = ['fiber' => $fiber, 'stream' => $stream]; | |
| } | |
| private function handleRequest(Fiber $fiber, mixed $request): void | |
| { | |
| if (!is_array($request) || ($request[0] ?? null) === null) { | |
| $this->scheduleReady($fiber, null); | |
| return; | |
| } | |
| $op = $request[0]; | |
| if ($op === 'sleep') { | |
| $seconds = (float)($request[1] ?? 0.0); | |
| $wakeAt = microtime(true) + max(0.0, $seconds); | |
| // 完了値(ここでは要求秒数を返す) | |
| $value = $seconds; | |
| $this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $value]; | |
| return; | |
| } | |
| if ($op === 'readable') { | |
| $stream = $request[1] ?? null; | |
| $this->addReadableWait($fiber, $stream); | |
| return; | |
| } | |
| // 未知の要求 → 即再開 | |
| $this->scheduleReady($fiber, null); | |
| } | |
| private function pumpTimers(): void | |
| { | |
| if (empty($this->timers)) return; | |
| $now = microtime(true); | |
| $still = []; | |
| foreach ($this->timers as $t) { | |
| if ($t['time'] <= $now) { | |
| $this->scheduleReady($t['fiber'], $t['value']); | |
| } else { | |
| $still[] = $t; | |
| } | |
| } | |
| $this->timers = $still; | |
| } | |
| /** | |
| * stream_select で readable を起こす | |
| */ | |
| private function pumpReadable(int $timeoutUs): void | |
| { | |
| if (empty($this->readWait)) return; | |
| $read = []; | |
| foreach ($this->readWait as $id => $entries) { | |
| // entries は同じ stream を共有しているはずなので 0 番目を使う | |
| $read[] = $entries[0]['stream']; | |
| } | |
| $write = null; | |
| $except = null; | |
| $sec = intdiv($timeoutUs, 1_000_000); | |
| $usec = $timeoutUs % 1_000_000; | |
| // stream_select は参照渡しで配列を書き換えるので変数に入れる | |
| $r = $read; | |
| $n = @stream_select($r, $write, $except, $sec, $usec); | |
| if ($n === false || $n === 0) { | |
| return; // エラー or タイムアウト | |
| } | |
| foreach ($r as $stream) { | |
| $id = (int)$stream; | |
| if (!isset($this->readWait[$id])) continue; | |
| $entries = $this->readWait[$id]; | |
| unset($this->readWait[$id]); | |
| // readable になったので待ってたFiber全員を起こす(resume へ stream を渡す) | |
| foreach ($entries as $e) { | |
| $this->scheduleReady($e['fiber'], $stream); | |
| } | |
| } | |
| } | |
| private function nextTimerTimeoutUs(int $maxUs): int | |
| { | |
| if (empty($this->timers)) return $maxUs; | |
| $next = min(array_column($this->timers, 'time')); | |
| $delta = $next - microtime(true); | |
| if ($delta <= 0) return 0; | |
| $us = (int)($delta * 1_000_000); | |
| return min($us, $maxUs); | |
| } | |
| public function run(): void | |
| { | |
| while ( | |
| !$this->ready->isEmpty() | |
| || !empty($this->timers) | |
| || !empty($this->readWait) | |
| ) { | |
| // まずタイマーを起こせるだけ起こす | |
| $this->pumpTimers(); | |
| // ready があるなら即回す(I/O待ちに入らない) | |
| if (!$this->ready->isEmpty()) { | |
| /** @var array{0:Fiber,1:mixed} $item */ | |
| $item = $this->ready->dequeue(); | |
| [$fiber, $value] = $item; | |
| try { | |
| if (!$fiber->isStarted()) { | |
| $request = $fiber->start(); | |
| } else { | |
| $request = $fiber->resume($value); | |
| } | |
| } catch (Throwable $e) { | |
| fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL); | |
| continue; | |
| } | |
| if ($fiber->isTerminated()) { | |
| continue; | |
| } | |
| $this->handleRequest($fiber, $request); | |
| continue; | |
| } | |
| // ready が無い → I/O か 次のタイマーまで待つ | |
| // タイマーがあれば「次のタイマーまで」を最大待ち時間にする | |
| $timeoutUs = $this->nextTimerTimeoutUs(50_000); // 最大50ms | |
| if (!empty($this->readWait)) { | |
| $this->pumpReadable($timeoutUs); | |
| } else { | |
| // I/O待ちが無いなら単純に少し寝る | |
| usleep($timeoutUs); | |
| } | |
| } | |
| } | |
| } | |
| /** await:要求を出して、resume(value) の value を受け取る */ | |
| function await(array $request): mixed | |
| { | |
| return Fiber::suspend($request); | |
| } | |
| function sleep_async(float $seconds): float | |
| { | |
| /** @var float $slept */ | |
| $slept = await(['sleep', $seconds]); | |
| return $slept; | |
| } | |
| /** readable を待って1行読む(最小) */ | |
| function read_line_async($stream): ?string | |
| { | |
| /** @var resource $s */ | |
| $s = await(['readable', $stream]); // readable になった stream が返る | |
| $line = fgets($s); | |
| if ($line === false) return null; | |
| return $line; | |
| } | |
| /* ======================= | |
| * デモ | |
| * ======================= */ | |
| stream_set_blocking(STDIN, false); // 重要:ブロッキングしない | |
| $s = new Scheduler(); | |
| // 1) タイマーでtickし続けるタスク | |
| $s->newTask(function () { | |
| for ($i = 1; $i <= 10; $i++) { | |
| sleep_async(0.2); | |
| echo "[TICK] {$i}\n"; | |
| } | |
| echo "[TICK] done\n"; | |
| }); | |
| // 2) 入力待ちタスク | |
| $s->newTask(function () { | |
| echo "[IN ] Type something and press Enter (3 lines)...\n"; | |
| for ($i = 1; $i <= 3; $i++) { | |
| $line = read_line_async(STDIN); | |
| if ($line === null) { | |
| echo "[IN ] EOF\n"; | |
| return; | |
| } | |
| $line = rtrim($line, "\r\n"); | |
| echo "[IN ] got: {$line}\n"; | |
| } | |
| echo "[IN ] done\n"; | |
| }); | |
| $s->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment