Skip to content

Instantly share code, notes, and snippets.

@masakielastic
Created February 25, 2026 16:02
Show Gist options
  • Select an option

  • Save masakielastic/4ae7c885c40ba35852147223e4fede58 to your computer and use it in GitHub Desktop.

Select an option

Save masakielastic/4ae7c885c40ba35852147223e4fede58 to your computer and use it in GitHub Desktop.
HTTPS対応版:Fiberスケジューラ + https_get_async()
<?php
/**
* Fiber協調スケジューラ(sleep + readable + writable)
*/
final class Scheduler
{
/** @var SplQueue<array{0:Fiber,1:mixed}> */
private SplQueue $ready;
/** @var array<int, array{time: float, fiber: Fiber, value: mixed}> */
private array $timers = [];
/** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */
private array $readWait = [];
/** @var array<int, array<int, array{fiber: Fiber, stream: mixed}>> */
private array $writeWait = [];
public function __construct()
{
$this->ready = new SplQueue();
}
public function newTask(callable $fn): void
{
$fiber = new Fiber(fn() => $fn());
$this->scheduleReady($fiber, null);
}
private function scheduleReady(Fiber $fiber, mixed $value = null): void
{
$this->ready->enqueue([$fiber, $value]);
}
private function addWait(array &$bucket, Fiber $fiber, mixed $stream): void
{
if (!is_resource($stream)) {
$this->scheduleReady($fiber, null);
return;
}
$id = (int)$stream;
$bucket[$id][] = ['fiber' => $fiber, 'stream' => $stream];
}
private function handleRequest(Fiber $fiber, mixed $request): void
{
if (!is_array($request) || ($request[0] ?? null) === null) {
$this->scheduleReady($fiber, null);
return;
}
$op = $request[0];
if ($op === 'sleep') {
$seconds = (float)($request[1] ?? 0.0);
$wakeAt = microtime(true) + max(0.0, $seconds);
$this->timers[] = ['time' => $wakeAt, 'fiber' => $fiber, 'value' => $seconds];
return;
}
if ($op === 'readable') {
$this->addWait($this->readWait, $fiber, $request[1] ?? null);
return;
}
if ($op === 'writable') {
$this->addWait($this->writeWait, $fiber, $request[1] ?? null);
return;
}
$this->scheduleReady($fiber, null);
}
private function pumpTimers(): void
{
if (empty($this->timers)) return;
$now = microtime(true);
$still = [];
foreach ($this->timers as $t) {
if ($t['time'] <= $now) {
$this->scheduleReady($t['fiber'], $t['value']);
} else {
$still[] = $t;
}
}
$this->timers = $still;
}
private function nextTimeoutUs(int $maxUs): int
{
if (empty($this->timers)) return $maxUs;
$next = min(array_column($this->timers, 'time'));
$delta = $next - microtime(true);
if ($delta <= 0) return 0;
$us = (int)($delta * 1_000_000);
return min($us, $maxUs);
}
private function pumpSelect(int $timeoutUs): void
{
$read = [];
foreach ($this->readWait as $id => $entries) {
$read[] = $entries[0]['stream'];
}
$write = [];
foreach ($this->writeWait as $id => $entries) {
$write[] = $entries[0]['stream'];
}
if (empty($read) && empty($write)) {
usleep($timeoutUs);
return;
}
$except = null;
$sec = intdiv($timeoutUs, 1_000_000);
$usec = $timeoutUs % 1_000_000;
$r = $read;
$w = $write;
$n = @stream_select($r, $w, $except, $sec, $usec);
if ($n === false || $n === 0) return;
foreach ($r as $stream) {
$id = (int)$stream;
if (!isset($this->readWait[$id])) continue;
$entries = $this->readWait[$id];
unset($this->readWait[$id]);
foreach ($entries as $e) {
$this->scheduleReady($e['fiber'], $stream);
}
}
foreach ($w as $stream) {
$id = (int)$stream;
if (!isset($this->writeWait[$id])) continue;
$entries = $this->writeWait[$id];
unset($this->writeWait[$id]);
foreach ($entries as $e) {
$this->scheduleReady($e['fiber'], $stream);
}
}
}
public function run(): void
{
while (
!$this->ready->isEmpty()
|| !empty($this->timers)
|| !empty($this->readWait)
|| !empty($this->writeWait)
) {
$this->pumpTimers();
if (!$this->ready->isEmpty()) {
/** @var array{0:Fiber,1:mixed} $item */
$item = $this->ready->dequeue();
[$fiber, $value] = $item;
try {
if (!$fiber->isStarted()) {
$request = $fiber->start();
} else {
$request = $fiber->resume($value);
}
} catch (Throwable $e) {
fwrite(STDERR, "Task error: " . $e->getMessage() . PHP_EOL);
continue;
}
if (!$fiber->isTerminated()) {
$this->handleRequest($fiber, $request);
}
continue;
}
$timeoutUs = $this->nextTimeoutUs(50_000);
$this->pumpSelect($timeoutUs);
}
}
}
/** await:要求を出し、resume(value) の value を受け取る */
function await(array $request): mixed
{
return Fiber::suspend($request);
}
function sleep_async(float $seconds): float
{
/** @var float $slept */
$slept = await(['sleep', $seconds]);
return $slept;
}
function wait_readable($stream): void
{
await(['readable', $stream]);
}
function wait_writable($stream): void
{
await(['writable', $stream]);
}
/**
* non-blocking TCP connect(context対応)
*/
function tcp_connect_async(string $host, int $port, float $timeoutSec = 5.0, $context = null)
{
$addr = "tcp://{$host}:{$port}";
$errno = 0;
$errstr = '';
$flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
$sock = @stream_socket_client($addr, $errno, $errstr, $timeoutSec, $flags, $context);
if ($sock === false) {
throw new RuntimeException("connect failed: {$errstr} ({$errno})");
}
stream_set_blocking($sock, false);
// writable になるまで待つ(接続確立 or 失敗)
wait_writable($sock);
return $sock;
}
function write_all_async($sock, string $data): void
{
$offset = 0;
$len = strlen($data);
while ($offset < $len) {
wait_writable($sock);
$n = @fwrite($sock, substr($data, $offset));
if ($n === false) {
throw new RuntimeException("write failed");
}
if ($n === 0) {
sleep_async(0.01);
continue;
}
$offset += $n;
}
}
function read_all_async($sock, int $maxBytes = 2_000_000): string
{
$buf = '';
while (!feof($sock)) {
wait_readable($sock);
$chunk = @fread($sock, 8192);
if ($chunk === false) {
throw new RuntimeException("read failed");
}
if ($chunk === '') {
sleep_async(0.01);
continue;
}
$buf .= $chunk;
if (strlen($buf) > $maxBytes) {
throw new RuntimeException("response too large");
}
}
return $buf;
}
/**
* TLSハンドシェイク(non-blocking)
* stream_socket_enable_crypto() が 0 を返す間は read/write 待ちが必要。
*/
function tls_handshake_async($sock, string $host, float $timeoutSec = 10.0): void
{
$deadline = microtime(true) + $timeoutSec;
// クライアントTLS(環境により定数の対応が違うので @ で抑制せず、ここは固定で)
$method = STREAM_CRYPTO_METHOD_TLS_CLIENT;
// 最小の戦略:enable_crypto を叩き、0なら readable/writable を交互に待ってリトライ
$turn = 0;
while (true) {
$ok = @stream_socket_enable_crypto($sock, true, $method);
if ($ok === true) {
return; // 完了
}
if ($ok === false) {
$meta = stream_get_meta_data($sock);
$timedOut = $meta['timed_out'] ?? false;
throw new RuntimeException("TLS handshake failed" . ($timedOut ? " (timed out)" : ""));
}
// $ok === 0 : まだ途中
if (microtime(true) >= $deadline) {
throw new RuntimeException("TLS handshake timed out");
}
// どっち待ちが必要かは環境差があるので、read/write を交互に待つ(最小版)
if (($turn++ % 2) === 0) {
wait_readable($sock);
} else {
wait_writable($sock);
}
}
}
/**
* HTTPS GET(HTTP/1.1 + Connection: close)
* - 証明書検証を有効にしたいので peer_name / verify_peer_name を設定
* - 検証が邪魔な環境用に $insecure を用意
*/
function https_get_async(string $host, string $path, bool $insecure = false): string
{
$ssl = [
'SNI_enabled' => true,
'peer_name' => $host,
// ふつうは true 推奨
'verify_peer' => !$insecure,
'verify_peer_name' => !$insecure,
// insecure時は自己署名等を許す(デモ用)
'allow_self_signed' => $insecure,
];
$ctx = stream_context_create(['ssl' => $ssl]);
$sock = tcp_connect_async($host, 443, 5.0, $ctx);
// TLSを開始(この時点で SNI/peer_name が効く)
tls_handshake_async($sock, $host, 10.0);
$req =
"GET {$path} HTTP/1.1\r\n" .
"Host: {$host}\r\n" .
"User-Agent: fiber-mini-https/0.1\r\n" .
"Accept: */*\r\n" .
"Connection: close\r\n" .
"\r\n";
write_all_async($sock, $req);
$raw = read_all_async($sock);
fclose($sock);
return $raw;
}
/* =======================
* デモ:tickしつつHTTPS GET
* ======================= */
$s = new Scheduler();
// 背景tick
$s->newTask(function () {
for ($i = 1; $i <= 50; $i++) {
sleep_async(0.1);
echo "[TICK] {$i}\n";
}
});
// HTTPS GET
$s->newTask(function () {
$raw = https_get_async("httpbin.org", "/get", false);
[$head, $body] = explode("\r\n\r\n", $raw, 2) + [null, null];
echo "----- RESPONSE HEAD -----\n";
echo $head . "\n";
echo "----- RESPONSE BODY (first 400 bytes) -----\n";
echo substr($body ?? '', 0, 400) . "\n";
});
$s->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment