Skip to content

Instantly share code, notes, and snippets.

@masakielastic
Last active March 1, 2026 21:53
Show Gist options
  • Select an option

  • Save masakielastic/12cb14d2dc1f6682ae00a87d0058cda1 to your computer and use it in GitHub Desktop.

Select an option

Save masakielastic/12cb14d2dc1f6682ae00a87d0058cda1 to your computer and use it in GitHub Desktop.
PHP RFC: Polling API の最小限の実装

PHP RFC: Polling API の最小限の実装

<?php
declare(strict_types=1);
require_once __DIR__ . '/src/Io/Poll.php';
use Io\Poll\{Context, Event, StreamPollHandle};
function sendAndReceive($poll, $client, $message, $timeoutSeconds = 5) {
// Send message to server
fwrite($client, $message);
echo "Sent: $message";
// Wait for response
$watchers = $poll->wait($timeoutSeconds);
if (empty($watchers)) {
echo "Timeout waiting for response\n";
return false;
}
foreach ($watchers as $watcher) {
if ($watcher->hasTriggered(Event::Read)) {
$data = fread($client, 8192);
if ($data === false || $data === '') {
echo "Server closed connection\n";
return false;
}
echo "Received: $data";
return true;
}
if ($watcher->hasTriggered(Event::Error) || $watcher->hasTriggered(Event::HangUp)) {
echo "Connection error\n";
return false;
}
}
return true;
}
// Create a poll context
$poll = new Context();
// Connect to the echo server
$address = $argv[1] ?? getenv('POLL_CLIENT_ADDRESS') ?: 'tcp://127.0.0.1:8080';
$client = stream_socket_client($address, $errno, $errstr, 30);
if (!$client) {
die("Failed to connect: $errstr\n");
}
stream_set_blocking($client, false);
// Create handle and watch for readable data
$handle = new StreamPollHandle($client);
$watcher = $poll->add($handle, [Event::Read]);
// Send messages to the server
$messages = ["Hello, Server!\n", "How are you?\n", "Goodbye!\n"];
foreach ($messages as $message) {
if (!sendAndReceive($poll, $client, $message)) {
break;
}
// Small delay between messages
usleep(100000); // 100ms
}
fclose($client);
echo "Client finished\n";
<?php
declare(strict_types=1);
require_once __DIR__ . '/src/Io/Poll.php';
use Io\Poll\{Context, Event, StreamPollHandle};
$address = $argv[1] ?? getenv('POLL_SERVER_ADDRESS') ?: 'tcp://127.0.0.1:8080';
$poll = new Context();
$server = stream_socket_server($address, $errno, $errstr);
if (!$server) {
die("Failed to create server: $errstr\n");
}
stream_set_blocking($server, false);
$serverHandle = new StreamPollHandle($server);
$poll->add($serverHandle, [Event::Read], ['type' => 'server']);
echo "Server listening on {$address}\n";
while (true) {
$watchers = $poll->wait(1);
foreach ($watchers as $watcher) {
$data = $watcher->getData();
$handle = $watcher->getHandle();
if (!$handle instanceof StreamPollHandle) {
continue;
}
if ($data['type'] === 'server' && $watcher->hasTriggered(Event::Read)) {
$client = stream_socket_accept($handle->getStream(), 0);
if (!$client) {
continue;
}
stream_set_blocking($client, false);
$clientHandle = new StreamPollHandle($client);
$poll->add($clientHandle, [Event::Read], ['type' => 'client']);
echo "New client connected\n";
continue;
}
if ($data['type'] !== 'client') {
continue;
}
$stream = $handle->getStream();
if ($watcher->hasTriggered(Event::Read)) {
$buffer = fread($stream, 8192);
if ($buffer === false || $buffer === '') {
echo "Client disconnected\n";
$watcher->remove();
fclose($stream);
continue;
}
echo "Received: $buffer";
fwrite($stream, "Echo: $buffer");
}
if ($watcher->hasTriggered(Event::HangUp) || $watcher->hasTriggered(Event::Error)) {
echo "Client connection error or hangup\n";
$watcher->remove();
fclose($stream);
}
}
}
<?php
declare(strict_types=1);
namespace Io;
class IoException extends \Exception
{
}
namespace Io\Poll;
enum Backend
{
case Auto;
case Poll;
case Epoll;
case Kqueue;
case EventPorts;
case WSAPoll;
public static function getAvailableBackends(): array
{
return [self::Poll];
}
public function isAvailable(): bool
{
return match ($this) {
self::Auto, self::Poll => true,
default => false,
};
}
public function supportsEdgeTriggering(): bool
{
return false;
}
}
enum Event
{
case Read;
case Write;
case Error;
case HangUp;
case ReadHangUp;
case OneShot;
case EdgeTriggered;
}
class PollException extends \Io\IoException
{
}
abstract class FailedPollOperationException extends PollException
{
public const int ERROR_NONE = 0;
public const int ERROR_SYSTEM = 1;
public const int ERROR_NOMEM = 2;
public const int ERROR_INVALID = 3;
public const int ERROR_EXISTS = 4;
public const int ERROR_NOTFOUND = 5;
public const int ERROR_TIMEOUT = 6;
public const int ERROR_INTERRUPTED = 7;
public const int ERROR_PERMISSION = 8;
public const int ERROR_TOOBIG = 9;
public const int ERROR_AGAIN = 10;
public const int ERROR_NOSUPPORT = 11;
}
class FailedContextInitializationException extends FailedPollOperationException
{
}
class FailedHandleAddException extends FailedPollOperationException
{
}
class FailedWatcherModificationException extends FailedPollOperationException
{
}
class FailedPollWaitException extends FailedPollOperationException
{
}
class BackendUnavailableException extends PollException
{
}
class InactiveWatcherException extends PollException
{
}
class HandleAlreadyWatchedException extends PollException
{
}
class InvalidHandleException extends PollException
{
}
abstract class Handle
{
abstract protected function getFileDescriptor(): int;
}
final class StreamPollHandle extends Handle
{
private mixed $stream;
public function __construct($stream)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new InvalidHandleException('StreamPollHandle expects a valid stream resource');
}
$this->stream = $stream;
}
public function getStream()
{
return $this->stream;
}
public function isValid(): bool
{
// Keep EOF streams valid so wait() can surface remote close as HangUp/Read.
return \is_resource($this->stream) && \get_resource_type($this->stream) === 'stream';
}
protected function getFileDescriptor(): int
{
return (int) $this->stream;
}
}
final class Watcher
{
private array $watchedEvents;
private array $triggeredEvents = [];
private bool $active = true;
public function __construct(
private readonly Context $context,
private readonly Handle $handle,
array $events,
private mixed $data = null,
) {
$this->watchedEvents = self::normalizeEvents($events);
}
public function getHandle(): Handle
{
return $this->handle;
}
public function getWatchedEvents(): array
{
return $this->watchedEvents;
}
public function getTriggeredEvents(): array
{
return $this->triggeredEvents;
}
public function getData(): mixed
{
return $this->data;
}
public function hasTriggered(Event $event): bool
{
return \in_array($event, $this->triggeredEvents, true);
}
public function isActive(): bool
{
return $this->active;
}
public function modify(array $events, mixed $data = null): void
{
$this->assertActive();
$this->context->modifyWatcher($this, $events, $data);
}
public function modifyEvents(array $events): void
{
$this->assertActive();
$this->context->modifyWatcher($this, $events, $this->data);
}
public function modifyData(mixed $data): void
{
$this->assertActive();
$this->data = $data;
}
public function remove(): void
{
$this->assertActive();
$this->context->removeWatcher($this);
}
public function replaceConfiguration(array $events, mixed $data): void
{
$this->watchedEvents = self::normalizeEvents($events);
$this->data = $data;
}
public function replaceTriggeredEvents(array $events): void
{
$this->triggeredEvents = self::normalizeEvents($events);
}
public function deactivate(): void
{
$this->active = false;
$this->triggeredEvents = [];
}
private function assertActive(): void
{
if (!$this->active) {
throw new InactiveWatcherException('Watcher is no longer active');
}
}
private static function normalizeEvents(array $events): array
{
$unique = [];
foreach ($events as $event) {
if (!$event instanceof Event) {
throw new \TypeError('Events must be instances of ' . Event::class);
}
$unique[$event->name] = $event;
}
return \array_values($unique);
}
}
final class Context
{
private Backend $backend;
/** @var array<int, Watcher> */
private array $watchers = [];
public function __construct(Backend $backend = Backend::Auto)
{
if (!$backend->isAvailable()) {
throw new BackendUnavailableException(
"Backend {$backend->name} is not available",
);
}
$this->backend = $backend === Backend::Auto ? Backend::Poll : $backend;
}
public function add(Handle $handle, array $events, mixed $data = null): Watcher
{
$this->assertHandleSupported($handle);
$this->assertHandleValid($handle);
$normalizedEvents = $this->normalizeInputEvents($events, FailedHandleAddException::class);
$streamId = $this->streamId($handle);
if (isset($this->watchers[$streamId])) {
throw new HandleAlreadyWatchedException('Handle is already being watched');
}
$watcher = new Watcher($this, $handle, $normalizedEvents, $data);
$this->watchers[$streamId] = $watcher;
return $watcher;
}
public function wait(
int $timeoutSeconds = -1,
int $timeoutMicroseconds = 0,
int $maxEvents = -1,
): array
{
foreach ($this->watchers as $watcher) {
$watcher->replaceTriggeredEvents([]);
}
if ($this->watchers === []) {
if ($timeoutSeconds > 0 || $timeoutMicroseconds > 0) {
[$seconds, $microseconds] = $this->normalizeTimeout($timeoutSeconds, $timeoutMicroseconds);
if ($seconds !== null) {
\usleep(($seconds * 1000000) + $microseconds);
}
}
return [];
}
[$seconds, $microseconds] = $this->normalizeTimeout($timeoutSeconds, $timeoutMicroseconds);
$read = [];
$write = [];
$except = [];
$watchersByStream = [];
foreach ($this->watchers as $streamId => $watcher) {
$handle = $watcher->getHandle();
\assert($handle instanceof StreamPollHandle);
if (!$handle->isValid()) {
$watcher->replaceTriggeredEvents([Event::Error, Event::HangUp]);
$this->removeWatcher($watcher);
continue;
}
$stream = $handle->getStream();
$watchersByStream[$streamId] = $watcher;
if ($this->watchesRead($watcher)) {
$read[] = $stream;
}
if ($this->watchesWrite($watcher)) {
$write[] = $stream;
}
$except[] = $stream;
}
if ($watchersByStream === []) {
return [];
}
$selected = @\stream_select($read, $write, $except, $seconds, $microseconds);
if ($selected === false) {
throw new FailedPollWaitException(
'stream_select() failed',
FailedPollOperationException::ERROR_SYSTEM,
);
}
if ($selected === 0) {
return [];
}
$readyRead = $this->indexReadyStreams($read);
$readyWrite = $this->indexReadyStreams($write);
$readyExcept = $this->indexReadyStreams($except);
$triggeredWatchers = [];
foreach ($watchersByStream as $streamId => $watcher) {
$triggeredEvents = [];
if (isset($readyRead[$streamId])) {
$triggeredEvents[] = Event::Read;
if ($this->isReadHangUp($watcher)) {
$triggeredEvents[] = Event::HangUp;
if ($this->watchesReadHangUp($watcher)) {
$triggeredEvents[] = Event::ReadHangUp;
}
}
}
if (isset($readyWrite[$streamId])) {
$triggeredEvents[] = Event::Write;
}
if (isset($readyExcept[$streamId])) {
$triggeredEvents[] = Event::Error;
}
if ($triggeredEvents === []) {
continue;
}
$watcher->replaceTriggeredEvents($triggeredEvents);
$triggeredWatchers[] = $watcher;
if ($this->watchesOneShot($watcher)) {
$this->removeWatcher($watcher);
}
if ($maxEvents > 0 && \count($triggeredWatchers) >= $maxEvents) {
break;
}
}
return $triggeredWatchers;
}
public function getBackend(): Backend
{
return $this->backend;
}
public function modifyWatcher(Watcher $watcher, array $events, mixed $data): void
{
$handle = $watcher->getHandle();
$this->assertHandleSupported($handle);
$this->assertHandleValid($handle);
try {
$normalizedEvents = $this->normalizeInputEvents($events, FailedWatcherModificationException::class);
} catch (PollException $exception) {
throw $exception;
}
$watcher->replaceConfiguration($normalizedEvents, $data);
}
public function removeWatcher(Watcher $watcher): void
{
$streamId = $this->streamId($watcher->getHandle());
unset($this->watchers[$streamId]);
$watcher->deactivate();
}
private function normalizeInputEvents(array $events, string $exceptionClass): array
{
if ($events === []) {
throw new $exceptionClass(
'At least one event must be specified',
FailedPollOperationException::ERROR_INVALID,
);
}
$normalized = [];
foreach ($events as $event) {
if (!$event instanceof Event) {
throw new \TypeError('Events must be instances of ' . Event::class);
}
if ($event === Event::Error || $event === Event::HangUp) {
throw new $exceptionClass(
"{$event->name} is an output-only event",
FailedPollOperationException::ERROR_INVALID,
);
}
if ($event === Event::EdgeTriggered) {
throw new $exceptionClass(
'Edge-triggered polling is not supported by the stream_select() backend',
FailedPollOperationException::ERROR_NOSUPPORT,
);
}
$normalized[$event->name] = $event;
}
return \array_values($normalized);
}
private function assertHandleSupported(Handle $handle): void
{
if (!$handle instanceof StreamPollHandle) {
throw new FailedHandleAddException(
'Only StreamPollHandle is supported by this implementation',
FailedPollOperationException::ERROR_NOSUPPORT,
);
}
}
private function assertHandleValid(Handle $handle): void
{
\assert($handle instanceof StreamPollHandle);
if (!$handle->isValid()) {
throw new InvalidHandleException('Handle is no longer valid for polling');
}
}
private function streamId(Handle $handle): int
{
\assert($handle instanceof StreamPollHandle);
return (int) $handle->getStream();
}
private function normalizeTimeout(int $timeoutSeconds, int $timeoutMicroseconds): array
{
if ($timeoutSeconds < -1) {
throw new FailedPollWaitException(
'Timeout seconds must be -1 or greater',
FailedPollOperationException::ERROR_INVALID,
);
}
if ($timeoutSeconds === -1) {
return [null, null];
}
if ($timeoutMicroseconds < 0) {
throw new FailedPollWaitException(
'Timeout microseconds must be 0 or greater',
FailedPollOperationException::ERROR_INVALID,
);
}
$timeoutSeconds += intdiv($timeoutMicroseconds, 1000000);
$timeoutMicroseconds %= 1000000;
return [$timeoutSeconds, $timeoutMicroseconds];
}
private function indexReadyStreams(array $streams): array
{
$indexed = [];
foreach ($streams as $stream) {
$indexed[(int) $stream] = true;
}
return $indexed;
}
private function watchesRead(Watcher $watcher): bool
{
$events = $watcher->getWatchedEvents();
return \in_array(Event::Read, $events, true) || \in_array(Event::ReadHangUp, $events, true);
}
private function watchesWrite(Watcher $watcher): bool
{
return \in_array(Event::Write, $watcher->getWatchedEvents(), true);
}
private function watchesReadHangUp(Watcher $watcher): bool
{
return \in_array(Event::ReadHangUp, $watcher->getWatchedEvents(), true);
}
private function watchesOneShot(Watcher $watcher): bool
{
return \in_array(Event::OneShot, $watcher->getWatchedEvents(), true);
}
private function isReadHangUp(Watcher $watcher): bool
{
$handle = $watcher->getHandle();
\assert($handle instanceof StreamPollHandle);
$stream = $handle->getStream();
return \is_resource($stream) && \feof($stream);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment