Skip to content

Instantly share code, notes, and snippets.

@masakielastic
Created February 25, 2026 15:53
Show Gist options
  • Select an option

  • Save masakielastic/1966a03b304e3104eddf7f13a4c90393 to your computer and use it in GitHub Desktop.

Select an option

Save masakielastic/1966a03b304e3104eddf7f13a4c90393 to your computer and use it in GitHub Desktop.
Fiber協調スケジューラ。入力待ち
<?php
/**
* Fiber協調スケジューラ(sleep + stream_select readable 対応)
* - await(['sleep', seconds]) → 指定時刻に resume(value)
* - await(['readable', stream]) → stream が readable になったら resume(stream)
*/
final class Scheduler
{
/** @var SplQueue<array{0:Fiber,1:mixed}> */
private SplQueue $ready;
/** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */
private array $timers = [];
/**
* readable待ち:
* streamId(int) => array<int, array{fiber: Fiber, stream: resource}>
* ※ 同一ストリームに複数Fiberをぶら下げられるように配列
* @var array<int, array<int, array{fiber: Fiber, stream: mixed}>>
*/
private array $readWait = [];
public function __construct()
{
$this->ready = new SplQueue();
}
public function newTask(callable $fn): void
{
$fiber = new Fiber(fn() => $fn());
$this->scheduleReady($fiber, null);
}
private function scheduleReady(Fiber $fiber, mixed $value = null): void
{
$this->ready->enqueue([$fiber, $value]);
}
private function addReadableWait(Fiber $fiber, mixed $stream): void
{
if (!is_resource($stream)) {
// 例外にしても良いが、最小実装として即再開
$this->scheduleReady($fiber, null);
return;
}
$id = (int)$stream;
$this->readWait[$id][] = ['fiber' => $fiber, 'stream' => $stream];
}
private function handleRequest(Fiber $fiber, mixed $request): void
{
if (!is_array($request) || ($request[0] ?? null) === null) {
$this->scheduleReady($fiber, null);
return;
}
$op = $request[0];
if ($op === 'sleep') {
$seconds = (float)($request[1] ?? 0.0);
$wakeAt = microtime(true) + max(0.0, $seconds);
// 完了値(ここでは要求秒数を返す)
$value = $seconds;
$this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $value];
return;
}
if ($op === 'readable') {
$stream = $request[1] ?? null;
$this->addReadableWait($fiber, $stream);
return;
}
// 未知の要求 → 即再開
$this->scheduleReady($fiber, null);
}
private function pumpTimers(): void
{
if (empty($this->timers)) return;
$now = microtime(true);
$still = [];
foreach ($this->timers as $t) {
if ($t['time'] <= $now) {
$this->scheduleReady($t['fiber'], $t['value']);
} else {
$still[] = $t;
}
}
$this->timers = $still;
}
/**
* stream_select で readable を起こす
*/
private function pumpReadable(int $timeoutUs): void
{
if (empty($this->readWait)) return;
$read = [];
foreach ($this->readWait as $id => $entries) {
// entries は同じ stream を共有しているはずなので 0 番目を使う
$read[] = $entries[0]['stream'];
}
$write = null;
$except = null;
$sec = intdiv($timeoutUs, 1_000_000);
$usec = $timeoutUs % 1_000_000;
// stream_select は参照渡しで配列を書き換えるので変数に入れる
$r = $read;
$n = @stream_select($r, $write, $except, $sec, $usec);
if ($n === false || $n === 0) {
return; // エラー or タイムアウト
}
foreach ($r as $stream) {
$id = (int)$stream;
if (!isset($this->readWait[$id])) continue;
$entries = $this->readWait[$id];
unset($this->readWait[$id]);
// readable になったので待ってたFiber全員を起こす(resume へ stream を渡す)
foreach ($entries as $e) {
$this->scheduleReady($e['fiber'], $stream);
}
}
}
private function nextTimerTimeoutUs(int $maxUs): int
{
if (empty($this->timers)) return $maxUs;
$next = min(array_column($this->timers, 'time'));
$delta = $next - microtime(true);
if ($delta <= 0) return 0;
$us = (int)($delta * 1_000_000);
return min($us, $maxUs);
}
public function run(): void
{
while (
!$this->ready->isEmpty()
|| !empty($this->timers)
|| !empty($this->readWait)
) {
// まずタイマーを起こせるだけ起こす
$this->pumpTimers();
// ready があるなら即回す(I/O待ちに入らない)
if (!$this->ready->isEmpty()) {
/** @var array{0:Fiber,1:mixed} $item */
$item = $this->ready->dequeue();
[$fiber, $value] = $item;
try {
if (!$fiber->isStarted()) {
$request = $fiber->start();
} else {
$request = $fiber->resume($value);
}
} catch (Throwable $e) {
fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL);
continue;
}
if ($fiber->isTerminated()) {
continue;
}
$this->handleRequest($fiber, $request);
continue;
}
// ready が無い → I/O か 次のタイマーまで待つ
// タイマーがあれば「次のタイマーまで」を最大待ち時間にする
$timeoutUs = $this->nextTimerTimeoutUs(50_000); // 最大50ms
if (!empty($this->readWait)) {
$this->pumpReadable($timeoutUs);
} else {
// I/O待ちが無いなら単純に少し寝る
usleep($timeoutUs);
}
}
}
}
/** await:要求を出して、resume(value) の value を受け取る */
function await(array $request): mixed
{
return Fiber::suspend($request);
}
function sleep_async(float $seconds): float
{
/** @var float $slept */
$slept = await(['sleep', $seconds]);
return $slept;
}
/** readable を待って1行読む(最小) */
function read_line_async($stream): ?string
{
/** @var resource $s */
$s = await(['readable', $stream]); // readable になった stream が返る
$line = fgets($s);
if ($line === false) return null;
return $line;
}
/* =======================
* デモ
* ======================= */
stream_set_blocking(STDIN, false); // 重要:ブロッキングしない
$s = new Scheduler();
// 1) タイマーでtickし続けるタスク
$s->newTask(function () {
for ($i = 1; $i <= 10; $i++) {
sleep_async(0.2);
echo "[TICK] {$i}\n";
}
echo "[TICK] done\n";
});
// 2) 入力待ちタスク
$s->newTask(function () {
echo "[IN ] Type something and press Enter (3 lines)...\n";
for ($i = 1; $i <= 3; $i++) {
$line = read_line_async(STDIN);
if ($line === null) {
echo "[IN ] EOF\n";
return;
}
$line = rtrim($line, "\r\n");
echo "[IN ] got: {$line}\n";
}
echo "[IN ] done\n";
});
$s->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment