Last active
March 1, 2026 21:53
-
-
Save masakielastic/12cb14d2dc1f6682ae00a87d0058cda1 to your computer and use it in GitHub Desktop.
PHP RFC: Polling API の最小限の実装
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| declare(strict_types=1); | |
| require_once __DIR__ . '/src/Io/Poll.php'; | |
| use Io\Poll\{Context, Event, StreamPollHandle}; | |
| function sendAndReceive($poll, $client, $message, $timeoutSeconds = 5) { | |
| // Send message to server | |
| fwrite($client, $message); | |
| echo "Sent: $message"; | |
| // Wait for response | |
| $watchers = $poll->wait($timeoutSeconds); | |
| if (empty($watchers)) { | |
| echo "Timeout waiting for response\n"; | |
| return false; | |
| } | |
| foreach ($watchers as $watcher) { | |
| if ($watcher->hasTriggered(Event::Read)) { | |
| $data = fread($client, 8192); | |
| if ($data === false || $data === '') { | |
| echo "Server closed connection\n"; | |
| return false; | |
| } | |
| echo "Received: $data"; | |
| return true; | |
| } | |
| if ($watcher->hasTriggered(Event::Error) || $watcher->hasTriggered(Event::HangUp)) { | |
| echo "Connection error\n"; | |
| return false; | |
| } | |
| } | |
| return true; | |
| } | |
| // Create a poll context | |
| $poll = new Context(); | |
| // Connect to the echo server | |
| $address = $argv[1] ?? getenv('POLL_CLIENT_ADDRESS') ?: 'tcp://127.0.0.1:8080'; | |
| $client = stream_socket_client($address, $errno, $errstr, 30); | |
| if (!$client) { | |
| die("Failed to connect: $errstr\n"); | |
| } | |
| stream_set_blocking($client, false); | |
| // Create handle and watch for readable data | |
| $handle = new StreamPollHandle($client); | |
| $watcher = $poll->add($handle, [Event::Read]); | |
| // Send messages to the server | |
| $messages = ["Hello, Server!\n", "How are you?\n", "Goodbye!\n"]; | |
| foreach ($messages as $message) { | |
| if (!sendAndReceive($poll, $client, $message)) { | |
| break; | |
| } | |
| // Small delay between messages | |
| usleep(100000); // 100ms | |
| } | |
| fclose($client); | |
| echo "Client finished\n"; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| declare(strict_types=1); | |
| require_once __DIR__ . '/src/Io/Poll.php'; | |
| use Io\Poll\{Context, Event, StreamPollHandle}; | |
| $address = $argv[1] ?? getenv('POLL_SERVER_ADDRESS') ?: 'tcp://127.0.0.1:8080'; | |
| $poll = new Context(); | |
| $server = stream_socket_server($address, $errno, $errstr); | |
| if (!$server) { | |
| die("Failed to create server: $errstr\n"); | |
| } | |
| stream_set_blocking($server, false); | |
| $serverHandle = new StreamPollHandle($server); | |
| $poll->add($serverHandle, [Event::Read], ['type' => 'server']); | |
| echo "Server listening on {$address}\n"; | |
| while (true) { | |
| $watchers = $poll->wait(1); | |
| foreach ($watchers as $watcher) { | |
| $data = $watcher->getData(); | |
| $handle = $watcher->getHandle(); | |
| if (!$handle instanceof StreamPollHandle) { | |
| continue; | |
| } | |
| if ($data['type'] === 'server' && $watcher->hasTriggered(Event::Read)) { | |
| $client = stream_socket_accept($handle->getStream(), 0); | |
| if (!$client) { | |
| continue; | |
| } | |
| stream_set_blocking($client, false); | |
| $clientHandle = new StreamPollHandle($client); | |
| $poll->add($clientHandle, [Event::Read], ['type' => 'client']); | |
| echo "New client connected\n"; | |
| continue; | |
| } | |
| if ($data['type'] !== 'client') { | |
| continue; | |
| } | |
| $stream = $handle->getStream(); | |
| if ($watcher->hasTriggered(Event::Read)) { | |
| $buffer = fread($stream, 8192); | |
| if ($buffer === false || $buffer === '') { | |
| echo "Client disconnected\n"; | |
| $watcher->remove(); | |
| fclose($stream); | |
| continue; | |
| } | |
| echo "Received: $buffer"; | |
| fwrite($stream, "Echo: $buffer"); | |
| } | |
| if ($watcher->hasTriggered(Event::HangUp) || $watcher->hasTriggered(Event::Error)) { | |
| echo "Client connection error or hangup\n"; | |
| $watcher->remove(); | |
| fclose($stream); | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| declare(strict_types=1); | |
| namespace Io; | |
| class IoException extends \Exception | |
| { | |
| } | |
| namespace Io\Poll; | |
| enum Backend | |
| { | |
| case Auto; | |
| case Poll; | |
| case Epoll; | |
| case Kqueue; | |
| case EventPorts; | |
| case WSAPoll; | |
| public static function getAvailableBackends(): array | |
| { | |
| return [self::Poll]; | |
| } | |
| public function isAvailable(): bool | |
| { | |
| return match ($this) { | |
| self::Auto, self::Poll => true, | |
| default => false, | |
| }; | |
| } | |
| public function supportsEdgeTriggering(): bool | |
| { | |
| return false; | |
| } | |
| } | |
| enum Event | |
| { | |
| case Read; | |
| case Write; | |
| case Error; | |
| case HangUp; | |
| case ReadHangUp; | |
| case OneShot; | |
| case EdgeTriggered; | |
| } | |
| class PollException extends \Io\IoException | |
| { | |
| } | |
| abstract class FailedPollOperationException extends PollException | |
| { | |
| public const int ERROR_NONE = 0; | |
| public const int ERROR_SYSTEM = 1; | |
| public const int ERROR_NOMEM = 2; | |
| public const int ERROR_INVALID = 3; | |
| public const int ERROR_EXISTS = 4; | |
| public const int ERROR_NOTFOUND = 5; | |
| public const int ERROR_TIMEOUT = 6; | |
| public const int ERROR_INTERRUPTED = 7; | |
| public const int ERROR_PERMISSION = 8; | |
| public const int ERROR_TOOBIG = 9; | |
| public const int ERROR_AGAIN = 10; | |
| public const int ERROR_NOSUPPORT = 11; | |
| } | |
| class FailedContextInitializationException extends FailedPollOperationException | |
| { | |
| } | |
| class FailedHandleAddException extends FailedPollOperationException | |
| { | |
| } | |
| class FailedWatcherModificationException extends FailedPollOperationException | |
| { | |
| } | |
| class FailedPollWaitException extends FailedPollOperationException | |
| { | |
| } | |
| class BackendUnavailableException extends PollException | |
| { | |
| } | |
| class InactiveWatcherException extends PollException | |
| { | |
| } | |
| class HandleAlreadyWatchedException extends PollException | |
| { | |
| } | |
| class InvalidHandleException extends PollException | |
| { | |
| } | |
| abstract class Handle | |
| { | |
| abstract protected function getFileDescriptor(): int; | |
| } | |
| final class StreamPollHandle extends Handle | |
| { | |
| private mixed $stream; | |
| public function __construct($stream) | |
| { | |
| if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') { | |
| throw new InvalidHandleException('StreamPollHandle expects a valid stream resource'); | |
| } | |
| $this->stream = $stream; | |
| } | |
| public function getStream() | |
| { | |
| return $this->stream; | |
| } | |
| public function isValid(): bool | |
| { | |
| // Keep EOF streams valid so wait() can surface remote close as HangUp/Read. | |
| return \is_resource($this->stream) && \get_resource_type($this->stream) === 'stream'; | |
| } | |
| protected function getFileDescriptor(): int | |
| { | |
| return (int) $this->stream; | |
| } | |
| } | |
| final class Watcher | |
| { | |
| private array $watchedEvents; | |
| private array $triggeredEvents = []; | |
| private bool $active = true; | |
| public function __construct( | |
| private readonly Context $context, | |
| private readonly Handle $handle, | |
| array $events, | |
| private mixed $data = null, | |
| ) { | |
| $this->watchedEvents = self::normalizeEvents($events); | |
| } | |
| public function getHandle(): Handle | |
| { | |
| return $this->handle; | |
| } | |
| public function getWatchedEvents(): array | |
| { | |
| return $this->watchedEvents; | |
| } | |
| public function getTriggeredEvents(): array | |
| { | |
| return $this->triggeredEvents; | |
| } | |
| public function getData(): mixed | |
| { | |
| return $this->data; | |
| } | |
| public function hasTriggered(Event $event): bool | |
| { | |
| return \in_array($event, $this->triggeredEvents, true); | |
| } | |
| public function isActive(): bool | |
| { | |
| return $this->active; | |
| } | |
| public function modify(array $events, mixed $data = null): void | |
| { | |
| $this->assertActive(); | |
| $this->context->modifyWatcher($this, $events, $data); | |
| } | |
| public function modifyEvents(array $events): void | |
| { | |
| $this->assertActive(); | |
| $this->context->modifyWatcher($this, $events, $this->data); | |
| } | |
| public function modifyData(mixed $data): void | |
| { | |
| $this->assertActive(); | |
| $this->data = $data; | |
| } | |
| public function remove(): void | |
| { | |
| $this->assertActive(); | |
| $this->context->removeWatcher($this); | |
| } | |
| public function replaceConfiguration(array $events, mixed $data): void | |
| { | |
| $this->watchedEvents = self::normalizeEvents($events); | |
| $this->data = $data; | |
| } | |
| public function replaceTriggeredEvents(array $events): void | |
| { | |
| $this->triggeredEvents = self::normalizeEvents($events); | |
| } | |
| public function deactivate(): void | |
| { | |
| $this->active = false; | |
| $this->triggeredEvents = []; | |
| } | |
| private function assertActive(): void | |
| { | |
| if (!$this->active) { | |
| throw new InactiveWatcherException('Watcher is no longer active'); | |
| } | |
| } | |
| private static function normalizeEvents(array $events): array | |
| { | |
| $unique = []; | |
| foreach ($events as $event) { | |
| if (!$event instanceof Event) { | |
| throw new \TypeError('Events must be instances of ' . Event::class); | |
| } | |
| $unique[$event->name] = $event; | |
| } | |
| return \array_values($unique); | |
| } | |
| } | |
| final class Context | |
| { | |
| private Backend $backend; | |
| /** @var array<int, Watcher> */ | |
| private array $watchers = []; | |
| public function __construct(Backend $backend = Backend::Auto) | |
| { | |
| if (!$backend->isAvailable()) { | |
| throw new BackendUnavailableException( | |
| "Backend {$backend->name} is not available", | |
| ); | |
| } | |
| $this->backend = $backend === Backend::Auto ? Backend::Poll : $backend; | |
| } | |
| public function add(Handle $handle, array $events, mixed $data = null): Watcher | |
| { | |
| $this->assertHandleSupported($handle); | |
| $this->assertHandleValid($handle); | |
| $normalizedEvents = $this->normalizeInputEvents($events, FailedHandleAddException::class); | |
| $streamId = $this->streamId($handle); | |
| if (isset($this->watchers[$streamId])) { | |
| throw new HandleAlreadyWatchedException('Handle is already being watched'); | |
| } | |
| $watcher = new Watcher($this, $handle, $normalizedEvents, $data); | |
| $this->watchers[$streamId] = $watcher; | |
| return $watcher; | |
| } | |
| public function wait( | |
| int $timeoutSeconds = -1, | |
| int $timeoutMicroseconds = 0, | |
| int $maxEvents = -1, | |
| ): array | |
| { | |
| foreach ($this->watchers as $watcher) { | |
| $watcher->replaceTriggeredEvents([]); | |
| } | |
| if ($this->watchers === []) { | |
| if ($timeoutSeconds > 0 || $timeoutMicroseconds > 0) { | |
| [$seconds, $microseconds] = $this->normalizeTimeout($timeoutSeconds, $timeoutMicroseconds); | |
| if ($seconds !== null) { | |
| \usleep(($seconds * 1000000) + $microseconds); | |
| } | |
| } | |
| return []; | |
| } | |
| [$seconds, $microseconds] = $this->normalizeTimeout($timeoutSeconds, $timeoutMicroseconds); | |
| $read = []; | |
| $write = []; | |
| $except = []; | |
| $watchersByStream = []; | |
| foreach ($this->watchers as $streamId => $watcher) { | |
| $handle = $watcher->getHandle(); | |
| \assert($handle instanceof StreamPollHandle); | |
| if (!$handle->isValid()) { | |
| $watcher->replaceTriggeredEvents([Event::Error, Event::HangUp]); | |
| $this->removeWatcher($watcher); | |
| continue; | |
| } | |
| $stream = $handle->getStream(); | |
| $watchersByStream[$streamId] = $watcher; | |
| if ($this->watchesRead($watcher)) { | |
| $read[] = $stream; | |
| } | |
| if ($this->watchesWrite($watcher)) { | |
| $write[] = $stream; | |
| } | |
| $except[] = $stream; | |
| } | |
| if ($watchersByStream === []) { | |
| return []; | |
| } | |
| $selected = @\stream_select($read, $write, $except, $seconds, $microseconds); | |
| if ($selected === false) { | |
| throw new FailedPollWaitException( | |
| 'stream_select() failed', | |
| FailedPollOperationException::ERROR_SYSTEM, | |
| ); | |
| } | |
| if ($selected === 0) { | |
| return []; | |
| } | |
| $readyRead = $this->indexReadyStreams($read); | |
| $readyWrite = $this->indexReadyStreams($write); | |
| $readyExcept = $this->indexReadyStreams($except); | |
| $triggeredWatchers = []; | |
| foreach ($watchersByStream as $streamId => $watcher) { | |
| $triggeredEvents = []; | |
| if (isset($readyRead[$streamId])) { | |
| $triggeredEvents[] = Event::Read; | |
| if ($this->isReadHangUp($watcher)) { | |
| $triggeredEvents[] = Event::HangUp; | |
| if ($this->watchesReadHangUp($watcher)) { | |
| $triggeredEvents[] = Event::ReadHangUp; | |
| } | |
| } | |
| } | |
| if (isset($readyWrite[$streamId])) { | |
| $triggeredEvents[] = Event::Write; | |
| } | |
| if (isset($readyExcept[$streamId])) { | |
| $triggeredEvents[] = Event::Error; | |
| } | |
| if ($triggeredEvents === []) { | |
| continue; | |
| } | |
| $watcher->replaceTriggeredEvents($triggeredEvents); | |
| $triggeredWatchers[] = $watcher; | |
| if ($this->watchesOneShot($watcher)) { | |
| $this->removeWatcher($watcher); | |
| } | |
| if ($maxEvents > 0 && \count($triggeredWatchers) >= $maxEvents) { | |
| break; | |
| } | |
| } | |
| return $triggeredWatchers; | |
| } | |
| public function getBackend(): Backend | |
| { | |
| return $this->backend; | |
| } | |
| public function modifyWatcher(Watcher $watcher, array $events, mixed $data): void | |
| { | |
| $handle = $watcher->getHandle(); | |
| $this->assertHandleSupported($handle); | |
| $this->assertHandleValid($handle); | |
| try { | |
| $normalizedEvents = $this->normalizeInputEvents($events, FailedWatcherModificationException::class); | |
| } catch (PollException $exception) { | |
| throw $exception; | |
| } | |
| $watcher->replaceConfiguration($normalizedEvents, $data); | |
| } | |
| public function removeWatcher(Watcher $watcher): void | |
| { | |
| $streamId = $this->streamId($watcher->getHandle()); | |
| unset($this->watchers[$streamId]); | |
| $watcher->deactivate(); | |
| } | |
| private function normalizeInputEvents(array $events, string $exceptionClass): array | |
| { | |
| if ($events === []) { | |
| throw new $exceptionClass( | |
| 'At least one event must be specified', | |
| FailedPollOperationException::ERROR_INVALID, | |
| ); | |
| } | |
| $normalized = []; | |
| foreach ($events as $event) { | |
| if (!$event instanceof Event) { | |
| throw new \TypeError('Events must be instances of ' . Event::class); | |
| } | |
| if ($event === Event::Error || $event === Event::HangUp) { | |
| throw new $exceptionClass( | |
| "{$event->name} is an output-only event", | |
| FailedPollOperationException::ERROR_INVALID, | |
| ); | |
| } | |
| if ($event === Event::EdgeTriggered) { | |
| throw new $exceptionClass( | |
| 'Edge-triggered polling is not supported by the stream_select() backend', | |
| FailedPollOperationException::ERROR_NOSUPPORT, | |
| ); | |
| } | |
| $normalized[$event->name] = $event; | |
| } | |
| return \array_values($normalized); | |
| } | |
| private function assertHandleSupported(Handle $handle): void | |
| { | |
| if (!$handle instanceof StreamPollHandle) { | |
| throw new FailedHandleAddException( | |
| 'Only StreamPollHandle is supported by this implementation', | |
| FailedPollOperationException::ERROR_NOSUPPORT, | |
| ); | |
| } | |
| } | |
| private function assertHandleValid(Handle $handle): void | |
| { | |
| \assert($handle instanceof StreamPollHandle); | |
| if (!$handle->isValid()) { | |
| throw new InvalidHandleException('Handle is no longer valid for polling'); | |
| } | |
| } | |
| private function streamId(Handle $handle): int | |
| { | |
| \assert($handle instanceof StreamPollHandle); | |
| return (int) $handle->getStream(); | |
| } | |
| private function normalizeTimeout(int $timeoutSeconds, int $timeoutMicroseconds): array | |
| { | |
| if ($timeoutSeconds < -1) { | |
| throw new FailedPollWaitException( | |
| 'Timeout seconds must be -1 or greater', | |
| FailedPollOperationException::ERROR_INVALID, | |
| ); | |
| } | |
| if ($timeoutSeconds === -1) { | |
| return [null, null]; | |
| } | |
| if ($timeoutMicroseconds < 0) { | |
| throw new FailedPollWaitException( | |
| 'Timeout microseconds must be 0 or greater', | |
| FailedPollOperationException::ERROR_INVALID, | |
| ); | |
| } | |
| $timeoutSeconds += intdiv($timeoutMicroseconds, 1000000); | |
| $timeoutMicroseconds %= 1000000; | |
| return [$timeoutSeconds, $timeoutMicroseconds]; | |
| } | |
| private function indexReadyStreams(array $streams): array | |
| { | |
| $indexed = []; | |
| foreach ($streams as $stream) { | |
| $indexed[(int) $stream] = true; | |
| } | |
| return $indexed; | |
| } | |
| private function watchesRead(Watcher $watcher): bool | |
| { | |
| $events = $watcher->getWatchedEvents(); | |
| return \in_array(Event::Read, $events, true) || \in_array(Event::ReadHangUp, $events, true); | |
| } | |
| private function watchesWrite(Watcher $watcher): bool | |
| { | |
| return \in_array(Event::Write, $watcher->getWatchedEvents(), true); | |
| } | |
| private function watchesReadHangUp(Watcher $watcher): bool | |
| { | |
| return \in_array(Event::ReadHangUp, $watcher->getWatchedEvents(), true); | |
| } | |
| private function watchesOneShot(Watcher $watcher): bool | |
| { | |
| return \in_array(Event::OneShot, $watcher->getWatchedEvents(), true); | |
| } | |
| private function isReadHangUp(Watcher $watcher): bool | |
| { | |
| $handle = $watcher->getHandle(); | |
| \assert($handle instanceof StreamPollHandle); | |
| $stream = $handle->getStream(); | |
| return \is_resource($stream) && \feof($stream); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment