Last active
February 25, 2026 15:47
-
-
Save masakielastic/6c54e3d01ba638a16524c670a3435019 to your computer and use it in GitHub Desktop.
ミニスケジューラー (PHP)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| /** | |
| * 超ミニ協調スケジューラ | |
| * - newTask(fn) でタスク(Fiber)登録 | |
| * - sleep(seconds) で「指定時刻まで待つ」をFiber側から要求 | |
| * - run() で実行(単一スレッド、協調のみ) | |
| */ | |
| final class Scheduler | |
| { | |
| /** @var SplQueue<Fiber> */ | |
| private SplQueue $ready; | |
| /** @var array<int, array{time: float, fiber: Fiber}> */ | |
| private array $timers = []; | |
| public function __construct() | |
| { | |
| $this->ready = new SplQueue(); | |
| } | |
| public function newTask(callable $fn): void | |
| { | |
| $fiber = new Fiber(function () use ($fn) { | |
| $fn(); | |
| }); | |
| $this->ready->enqueue($fiber); | |
| } | |
| /** | |
| * Fiberからの要求を処理する。 | |
| * ここでは ['sleep', seconds] だけ受ける。 | |
| */ | |
| private function handleYield(Fiber $fiber, mixed $yielded): void | |
| { | |
| if (!is_array($yielded) || count($yielded) < 1) { | |
| // 何も要求がなければ即 ready に戻す | |
| $this->ready->enqueue($fiber); | |
| return; | |
| } | |
| $op = $yielded[0]; | |
| if ($op === 'sleep') { | |
| $seconds = (float)($yielded[1] ?? 0.0); | |
| $wakeAt = microtime(true) + max(0.0, $seconds); | |
| $this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber]; | |
| return; | |
| } | |
| // 未知の要求 → とりあえず ready へ | |
| $this->ready->enqueue($fiber); | |
| } | |
| public function run(): void | |
| { | |
| while (!$this->ready->isEmpty() || !empty($this->timers)) { | |
| // timer を起こせるだけ起こす | |
| $now = microtime(true); | |
| if (!empty($this->timers)) { | |
| $still = []; | |
| foreach ($this->timers as $t) { | |
| if ($t['time'] <= $now) { | |
| $this->ready->enqueue($t['fiber']); | |
| } else { | |
| $still[] = $t; | |
| } | |
| } | |
| $this->timers = $still; | |
| } | |
| if ($this->ready->isEmpty()) { | |
| // ready が無いなら、次のタイマーまでちょっと寝る(busy loop回避) | |
| $next = min(array_column($this->timers, 'time')); | |
| $sleepUs = (int)max(0, ($next - microtime(true)) * 1_000_000); | |
| usleep(min($sleepUs, 50_000)); // 最大50msだけ寝て様子見 | |
| continue; | |
| } | |
| /** @var Fiber $fiber */ | |
| $fiber = $this->ready->dequeue(); | |
| try { | |
| if (!$fiber->isStarted()) { | |
| $yielded = $fiber->start(); | |
| } else { | |
| $yielded = $fiber->resume(); | |
| } | |
| } catch (Throwable $e) { | |
| // タスク例外はここで握りつぶさずログだけ出して終了扱い | |
| fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL); | |
| continue; | |
| } | |
| if ($fiber->isTerminated()) { | |
| continue; // 完了 | |
| } | |
| $this->handleYield($fiber, $yielded); | |
| } | |
| } | |
| } | |
| /** | |
| * タスク側API:指定秒だけ待つ | |
| * ここが「Fiberが自分で止まる」ポイント。 | |
| */ | |
| function sleep_async(float $seconds): void | |
| { | |
| Fiber::suspend(['sleep', $seconds]); | |
| } | |
| /* ======================= | |
| * デモ | |
| * ======================= */ | |
| $s = new Scheduler(); | |
| $s->newTask(function () { | |
| echo "[A] start\n"; | |
| sleep_async(0.3); | |
| echo "[A] after 0.3s\n"; | |
| sleep_async(0.2); | |
| echo "[A] end\n"; | |
| }); | |
| $s->newTask(function () { | |
| echo "[B] start\n"; | |
| for ($i = 1; $i <= 3; $i++) { | |
| sleep_async(0.15); | |
| echo "[B] tick {$i}\n"; | |
| } | |
| echo "[B] end\n"; | |
| }); | |
| $s->run(); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| /** | |
| * Fiber協調スケジューラ(await対応・最小、修正版) | |
| */ | |
| final class Scheduler | |
| { | |
| /** @var SplQueue<array{0:Fiber,1:mixed}> */ | |
| private SplQueue $ready; | |
| /** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */ | |
| private array $timers = []; | |
| public function __construct() | |
| { | |
| $this->ready = new SplQueue(); | |
| } | |
| public function newTask(callable $fn): void | |
| { | |
| $fiber = new Fiber(fn() => $fn()); | |
| // ★ここが修正点:ready には必ず [Fiber, value] を積む | |
| $this->scheduleReady($fiber, null); | |
| } | |
| private function scheduleReady(Fiber $fiber, mixed $value = null): void | |
| { | |
| $this->ready->enqueue([$fiber, $value]); | |
| } | |
| private function handleRequest(Fiber $fiber, mixed $request): void | |
| { | |
| if (!is_array($request) || ($request[0] ?? null) === null) { | |
| $this->scheduleReady($fiber, null); | |
| return; | |
| } | |
| $op = $request[0]; | |
| if ($op === 'sleep') { | |
| $seconds = (float)($request[1] ?? 0.0); | |
| $wakeAt = microtime(true) + max(0.0, $seconds); | |
| // 完了値(ここでは要求秒数を返すだけ) | |
| $value = $seconds; | |
| $this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $value]; | |
| return; | |
| } | |
| // 未知の要求 → 即再開 | |
| $this->scheduleReady($fiber, null); | |
| } | |
| public function run(): void | |
| { | |
| while (!$this->ready->isEmpty() || !empty($this->timers)) { | |
| // 起こせるタイマーを ready に戻す | |
| $now = microtime(true); | |
| if (!empty($this->timers)) { | |
| $still = []; | |
| foreach ($this->timers as $t) { | |
| if ($t['time'] <= $now) { | |
| $this->scheduleReady($t['fiber'], $t['value']); | |
| } else { | |
| $still[] = $t; | |
| } | |
| } | |
| $this->timers = $still; | |
| } | |
| if ($this->ready->isEmpty()) { | |
| // 次のタイマーまで少し待つ(busy loop回避) | |
| $next = min(array_column($this->timers, 'time')); | |
| $sleepUs = (int)max(0, ($next - microtime(true)) * 1_000_000); | |
| usleep(min($sleepUs, 50_000)); | |
| continue; | |
| } | |
| /** @var array{0:Fiber,1:mixed} $item */ | |
| $item = $this->ready->dequeue(); | |
| [$fiber, $value] = $item; | |
| try { | |
| if (!$fiber->isStarted()) { | |
| // start() には値を渡せない | |
| $request = $fiber->start(); | |
| } else { | |
| $request = $fiber->resume($value); | |
| } | |
| } catch (Throwable $e) { | |
| fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL); | |
| continue; | |
| } | |
| if ($fiber->isTerminated()) { | |
| continue; | |
| } | |
| $this->handleRequest($fiber, $request); | |
| } | |
| } | |
| } | |
| function await(array $request): mixed | |
| { | |
| return Fiber::suspend($request); | |
| } | |
| function sleep_async(float $seconds): float | |
| { | |
| /** @var float $slept */ | |
| $slept = await(['sleep', $seconds]); | |
| return $slept; | |
| } | |
| /* ======================= | |
| * デモ | |
| * ======================= */ | |
| $s = new Scheduler(); | |
| $s->newTask(function () { | |
| echo "[A] start\n"; | |
| $slept = sleep_async(0.25); | |
| echo "[A] resumed; slept={$slept}\n"; | |
| $slept = sleep_async(0.10); | |
| echo "[A] resumed; slept={$slept}\n"; | |
| echo "[A] end\n"; | |
| }); | |
| $s->newTask(function () { | |
| echo "[B] start\n"; | |
| for ($i = 1; $i <= 3; $i++) { | |
| $slept = sleep_async(0.15); | |
| echo "[B] tick {$i}; slept={$slept}\n"; | |
| } | |
| echo "[B] end\n"; | |
| }); | |
| $s->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment