Skip to content

Instantly share code, notes, and snippets.

@masakielastic
Created February 25, 2026 15:56
Show Gist options
  • Select an option

  • Save masakielastic/104fc9255bf407b81e6aa51b30d3b248 to your computer and use it in GitHub Desktop.

Select an option

Save masakielastic/104fc9255bf407b81e6aa51b30d3b248 to your computer and use it in GitHub Desktop.
スケジューラーで非同期 HTTP GET
<?php
/**
* Fiber協調スケジューラ(sleep + readable + writable)
* - await(['sleep', seconds]) -> resume(seconds)
* - await(['readable', stream]) -> resume(stream)
* - await(['writable', stream]) -> resume(stream)
*/
final class Scheduler
{
/** @var SplQueue<array{0:Fiber,1:mixed}> */
private SplQueue $ready;
/** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */
private array $timers = [];
/** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */
private array $readWait = [];
/** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */
private array $writeWait = [];
public function __construct()
{
$this->ready = new SplQueue();
}
public function newTask(callable $fn): void
{
$fiber = new Fiber(fn() => $fn());
$this->scheduleReady($fiber, null);
}
private function scheduleReady(Fiber $fiber, mixed $value = null): void
{
$this->ready->enqueue([$fiber, $value]);
}
private function addWait(array &$bucket, Fiber $fiber, mixed $stream): void
{
if (!is_resource($stream)) {
$this->scheduleReady($fiber, null);
return;
}
$id = (int)$stream;
$bucket[$id][] = ['fiber' => $fiber, 'stream' => $stream];
}
private function handleRequest(Fiber $fiber, mixed $request): void
{
if (!is_array($request) || ($request[0] ?? null) === null) {
$this->scheduleReady($fiber, null);
return;
}
$op = $request[0];
if ($op === 'sleep') {
$seconds = (float)($request[1] ?? 0.0);
$wakeAt = microtime(true) + max(0.0, $seconds);
$this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $seconds];
return;
}
if ($op === 'readable') {
$this->addWait($this->readWait, $fiber, $request[1] ?? null);
return;
}
if ($op === 'writable') {
$this->addWait($this->writeWait, $fiber, $request[1] ?? null);
return;
}
$this->scheduleReady($fiber, null);
}
private function pumpTimers(): void
{
if (empty($this->timers)) return;
$now = microtime(true);
$still = [];
foreach ($this->timers as $t) {
if ($t['time'] <= $now) {
$this->scheduleReady($t['fiber'], $t['value']);
} else {
$still[] = $t;
}
}
$this->timers = $still;
}
private function nextTimeoutUs(int $maxUs): int
{
if (empty($this->timers)) return $maxUs;
$next = min(array_column($this->timers, 'time'));
$delta = $next - microtime(true);
if ($delta <= 0) return 0;
$us = (int)($delta * 1_000_000);
return min($us, $maxUs);
}
private function pumpSelect(int $timeoutUs): void
{
$read = [];
foreach ($this->readWait as $id => $entries) {
$read[] = $entries[0]['stream'];
}
$write = [];
foreach ($this->writeWait as $id => $entries) {
$write[] = $entries[0]['stream'];
}
if (empty($read) && empty($write)) {
usleep($timeoutUs);
return;
}
$except = null;
$sec = intdiv($timeoutUs, 1_000_000);
$usec = $timeoutUs % 1_000_000;
$r = $read;
$w = $write;
$n = @stream_select($r, $w, $except, $sec, $usec);
if ($n === false || $n === 0) return;
foreach ($r as $stream) {
$id = (int)$stream;
if (!isset($this->readWait[$id])) continue;
$entries = $this->readWait[$id];
unset($this->readWait[$id]);
foreach ($entries as $e) {
$this->scheduleReady($e['fiber'], $stream);
}
}
foreach ($w as $stream) {
$id = (int)$stream;
if (!isset($this->writeWait[$id])) continue;
$entries = $this->writeWait[$id];
unset($this->writeWait[$id]);
foreach ($entries as $e) {
$this->scheduleReady($e['fiber'], $stream);
}
}
}
public function run(): void
{
while (
!$this->ready->isEmpty()
|| !empty($this->timers)
|| !empty($this->readWait)
|| !empty($this->writeWait)
) {
$this->pumpTimers();
if (!$this->ready->isEmpty()) {
/** @var array{0:Fiber,1:mixed} $item */
$item = $this->ready->dequeue();
[$fiber, $value] = $item;
try {
if (!$fiber->isStarted()) {
$request = $fiber->start();
} else {
$request = $fiber->resume($value);
}
} catch (Throwable $e) {
fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL);
continue;
}
if (!$fiber->isTerminated()) {
$this->handleRequest($fiber, $request);
}
continue;
}
$timeoutUs = $this->nextTimeoutUs(50_000);
$this->pumpSelect($timeoutUs);
}
}
}
/** await:要求を出し、resume(value) の value を受け取る */
function await(array $request): mixed
{
return Fiber::suspend($request);
}
function sleep_async(float $seconds): float
{
/** @var float $slept */
$slept = await(['sleep', $seconds]);
return $slept;
}
function wait_readable($stream): void
{
await(['readable', $stream]);
}
function wait_writable($stream): void
{
await(['writable', $stream]);
}
/**
* 非同期 connect(平文TCP)
* - STREAM_CLIENT_ASYNC_CONNECT で接続開始
* - writable になったら「だいたい接続完了扱い」にする(最小)
*/
function tcp_connect_async(string $host, int $port, float $timeoutSec = 5.0)
{
$addr = "tcp://{$host}:{$port}";
$errno = 0;
$errstr = '';
$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
$sock = @stream_socket_client($addr, $errno, $errstr, $timeoutSec, $flags);
if ($sock === false) {
throw new RuntimeException("connect failed: {$errstr} ({$errno})");
}
stream_set_blocking($sock, false);
// writable になるまで待つ(接続確立 or 失敗)
wait_writable($sock);
// 失敗検出をちゃんとやるなら socket_get_option(SO_ERROR) が欲しいが、
// PHPのstreamでは扱いが環境依存なので最小版は省略。
return $sock;
}
function write_all_async($sock, string $data): void
{
$offset = 0;
$len = strlen($data);
while ($offset < $len) {
wait_writable($sock);
$n = @fwrite($sock, substr($data, $offset));
if ($n === false) {
throw new RuntimeException("write failed");
}
if ($n === 0) {
// 進まない場合は少し譲る
sleep_async(0.01);
continue;
}
$offset += $n;
}
}
function read_all_async($sock, int $maxBytes = 2_000_000): string
{
$buf = '';
while (!feof($sock)) {
wait_readable($sock);
$chunk = @fread($sock, 8192);
if ($chunk === false) {
throw new RuntimeException("read failed");
}
if ($chunk === '') {
// まだ読めない/一時的、譲る
sleep_async(0.01);
continue;
}
$buf .= $chunk;
if (strlen($buf) > $maxBytes) {
throw new RuntimeException("response too large");
}
}
return $buf;
}
/**
* 最小 HTTP/1.1 GET(平文)
* - Connection: close にして、サーバが切ったら読み終わり
*/
function http_get_async(string $host, int $port, string $path): string
{
$sock = tcp_connect_async($host, $port);
$req =
"GET {$path} HTTP/1.1\r\n" .
"Host: {$host}\r\n" .
"User-Agent: fiber-mini-client/0.1\r\n" .
"Accept: */*\r\n" .
"Connection: close\r\n" .
"\r\n";
write_all_async($sock, $req);
$raw = read_all_async($sock);
fclose($sock);
return $raw;
}
/* =======================
* デモ:TICKしつつHTTP GET
* ======================= */
$s = new Scheduler();
// 1) 背景でtick(「止まってない」確認用)
$s->newTask(function () {
for ($i = 1; $i <= 20; $i++) {
sleep_async(0.1);
echo "[TICK] {$i}\n";
}
});
// 2) HTTP GET タスク(平文 HTTP)
$s->newTask(function () {
// httpbin.org は HTTPS が主流ですが、80番の平文HTTPも返してくれます(環境依存の可能性あり)
$raw = http_get_async("httpbin.org", 80, "/get");
// ヘッダとボディを分割(最小)
[$head, $body] = explode("\r\n\r\n", $raw, 2) + [null, null];
echo "----- RESPONSE HEAD -----\n";
echo $head . "\n";
echo "----- RESPONSE BODY (first 400 bytes) -----\n";
echo substr($body ?? '', 0, 400) . "\n";
});
$s->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment