Skip to content

Instantly share code, notes, and snippets.

@masakielastic
Last active February 25, 2026 15:47
Show Gist options
  • Select an option

  • Save masakielastic/6c54e3d01ba638a16524c670a3435019 to your computer and use it in GitHub Desktop.

Select an option

Save masakielastic/6c54e3d01ba638a16524c670a3435019 to your computer and use it in GitHub Desktop.
ミニスケジューラー (PHP)
<?php
/**
* 超ミニ協調スケジューラ
* - newTask(fn) でタスク(Fiber)登録
* - sleep(seconds) で「指定時刻まで待つ」をFiber側から要求
* - run() で実行(単一スレッド、協調のみ)
*/
final class Scheduler
{
/** @var SplQueue<Fiber> */
private SplQueue $ready;
/** @var array<int, array{time: float, fiber: Fiber}> */
private array $timers = [];
public function __construct()
{
$this->ready = new SplQueue();
}
public function newTask(callable $fn): void
{
$fiber = new Fiber(function () use ($fn) {
$fn();
});
$this->ready->enqueue($fiber);
}
/**
* Fiberからの要求を処理する。
* ここでは ['sleep', seconds] だけ受ける。
*/
private function handleYield(Fiber $fiber, mixed $yielded): void
{
if (!is_array($yielded) || count($yielded) < 1) {
// 何も要求がなければ即 ready に戻す
$this->ready->enqueue($fiber);
return;
}
$op = $yielded[0];
if ($op === 'sleep') {
$seconds = (float)($yielded[1] ?? 0.0);
$wakeAt = microtime(true) + max(0.0, $seconds);
$this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber];
return;
}
// 未知の要求 → とりあえず ready へ
$this->ready->enqueue($fiber);
}
public function run(): void
{
while (!$this->ready->isEmpty() || !empty($this->timers)) {
// timer を起こせるだけ起こす
$now = microtime(true);
if (!empty($this->timers)) {
$still = [];
foreach ($this->timers as $t) {
if ($t['time'] <= $now) {
$this->ready->enqueue($t['fiber']);
} else {
$still[] = $t;
}
}
$this->timers = $still;
}
if ($this->ready->isEmpty()) {
// ready が無いなら、次のタイマーまでちょっと寝る(busy loop回避)
$next = min(array_column($this->timers, 'time'));
$sleepUs = (int)max(0, ($next - microtime(true)) * 1_000_000);
usleep(min($sleepUs, 50_000)); // 最大50msだけ寝て様子見
continue;
}
/** @var Fiber $fiber */
$fiber = $this->ready->dequeue();
try {
if (!$fiber->isStarted()) {
$yielded = $fiber->start();
} else {
$yielded = $fiber->resume();
}
} catch (Throwable $e) {
// タスク例外はここで握りつぶさずログだけ出して終了扱い
fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL);
continue;
}
if ($fiber->isTerminated()) {
continue; // 完了
}
$this->handleYield($fiber, $yielded);
}
}
}
/**
* タスク側API:指定秒だけ待つ
* ここが「Fiberが自分で止まる」ポイント。
*/
function sleep_async(float $seconds): void
{
Fiber::suspend(['sleep', $seconds]);
}
/* =======================
* デモ
* ======================= */
$s = new Scheduler();
$s->newTask(function () {
echo "[A] start\n";
sleep_async(0.3);
echo "[A] after 0.3s\n";
sleep_async(0.2);
echo "[A] end\n";
});
$s->newTask(function () {
echo "[B] start\n";
for ($i = 1; $i <= 3; $i++) {
sleep_async(0.15);
echo "[B] tick {$i}\n";
}
echo "[B] end\n";
});
$s->run();
<?php
/**
* Fiber協調スケジューラ(await対応・最小、修正版)
*/
final class Scheduler
{
/** @var SplQueue<array{0:Fiber,1:mixed}> */
private SplQueue $ready;
/** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */
private array $timers = [];
public function __construct()
{
$this->ready = new SplQueue();
}
public function newTask(callable $fn): void
{
$fiber = new Fiber(fn() => $fn());
// ★ここが修正点:ready には必ず [Fiber, value] を積む
$this->scheduleReady($fiber, null);
}
private function scheduleReady(Fiber $fiber, mixed $value = null): void
{
$this->ready->enqueue([$fiber, $value]);
}
private function handleRequest(Fiber $fiber, mixed $request): void
{
if (!is_array($request) || ($request[0] ?? null) === null) {
$this->scheduleReady($fiber, null);
return;
}
$op = $request[0];
if ($op === 'sleep') {
$seconds = (float)($request[1] ?? 0.0);
$wakeAt = microtime(true) + max(0.0, $seconds);
// 完了値(ここでは要求秒数を返すだけ)
$value = $seconds;
$this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $value];
return;
}
// 未知の要求 → 即再開
$this->scheduleReady($fiber, null);
}
public function run(): void
{
while (!$this->ready->isEmpty() || !empty($this->timers)) {
// 起こせるタイマーを ready に戻す
$now = microtime(true);
if (!empty($this->timers)) {
$still = [];
foreach ($this->timers as $t) {
if ($t['time'] <= $now) {
$this->scheduleReady($t['fiber'], $t['value']);
} else {
$still[] = $t;
}
}
$this->timers = $still;
}
if ($this->ready->isEmpty()) {
// 次のタイマーまで少し待つ(busy loop回避)
$next = min(array_column($this->timers, 'time'));
$sleepUs = (int)max(0, ($next - microtime(true)) * 1_000_000);
usleep(min($sleepUs, 50_000));
continue;
}
/** @var array{0:Fiber,1:mixed} $item */
$item = $this->ready->dequeue();
[$fiber, $value] = $item;
try {
if (!$fiber->isStarted()) {
// start() には値を渡せない
$request = $fiber->start();
} else {
$request = $fiber->resume($value);
}
} catch (Throwable $e) {
fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL);
continue;
}
if ($fiber->isTerminated()) {
continue;
}
$this->handleRequest($fiber, $request);
}
}
}
function await(array $request): mixed
{
return Fiber::suspend($request);
}
function sleep_async(float $seconds): float
{
/** @var float $slept */
$slept = await(['sleep', $seconds]);
return $slept;
}
/* =======================
* デモ
* ======================= */
$s = new Scheduler();
$s->newTask(function () {
echo "[A] start\n";
$slept = sleep_async(0.25);
echo "[A] resumed; slept={$slept}\n";
$slept = sleep_async(0.10);
echo "[A] resumed; slept={$slept}\n";
echo "[A] end\n";
});
$s->newTask(function () {
echo "[B] start\n";
for ($i = 1; $i <= 3; $i++) {
$slept = sleep_async(0.15);
echo "[B] tick {$i}; slept={$slept}\n";
}
echo "[B] end\n";
});
$s->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment