Skip to content

Instantly share code, notes, and snippets.

@nandordudas
Created June 26, 2025 09:15
Show Gist options
  • Save nandordudas/b33fe1772d44c87b9a42fdd457bcb653 to your computer and use it in GitHub Desktop.
Save nandordudas/b33fe1772d44c87b9a42fdd457bcb653 to your computer and use it in GitHub Desktop.
sse
// [TODO] no client code yet
<?php
declare(strict_types=1);
namespace DB;
use Countable;
use InvalidArgumentException;
use IteratorAggregate;
use LogicException;
use PDO;
use PDOStatement;
use RuntimeException;
use Throwable;
use Traversable;
enum DatabaseDriver: string
{
case MYSQL = 'mysql';
case POSTGRESQL = 'pgsql';
case SQLITE = 'sqlite';
case SQLSERVER = 'sqlsrv';
public function isSupported(): bool
{
return match ($this) {
self::SQLITE => false, // Not supported yet
default => in_array($this->value, PDO::getAvailableDrivers(), true),
};
}
}
enum PDODataType: int
{
case NULL = PDO::PARAM_NULL;
case INT = PDO::PARAM_INT;
case STR = PDO::PARAM_STR;
case LOB = PDO::PARAM_LOB;
case BOOL = PDO::PARAM_BOOL;
public static function fromValue(mixed $value): self
{
return match (true) {
is_null($value) => self::NULL,
is_int($value) => self::INT,
is_resource($value) => self::LOB,
is_bool($value) => self::BOOL,
default => self::STR,
};
}
}
interface Closable
{
public function close(): void;
}
/** @extends IteratorAggregate<int, mixed> */
interface QueryResultContract extends IteratorAggregate, Countable, Closable
{
/** @return ?array<string, mixed> */
public function next(): ?array;
/** @return ?array<string, mixed> */
public function first(): ?array;
}
abstract class QueryResultBase implements QueryResultContract
{
protected ?PDOStatement $statement;
private bool $closed = false;
public function __construct(PDOStatement $statement)
{
$this->statement = $statement;
}
public function close(): void
{
if (!$this->closed) {
$this->statement?->closeCursor();
$this->statement = null;
$this->closed = true;
}
}
protected function ensureNotClosed(): void
{
if ($this->closed) {
throw new LogicException('Query result has been closed');
}
}
/** @return ?array<string, mixed> */
public function first(): ?array
{
$result = $this->statement?->fetch();
$this->close();
return $result ?: null;
}
/** @return ?array<string, mixed> */
public function next(): ?array
{
return $this->statement?->fetch() ?: null;
}
public function getIterator(): Traversable
{
while ($row = $this->statement?->fetch(PDO::FETCH_ASSOC)) {
yield $row;
}
$this->statement?->closeCursor();
}
/**
* @warning May not work with SELECT in all database drivers
*/
public function count(): int
{
return $this->statement?->rowCount() ?? 0;
}
public function __destruct()
{
$this->close();
}
}
final class QueryResult extends QueryResultBase
{
public static function create(PDOStatement $statement): self
{
return new self($statement);
}
/** @return array<int, mixed> */
public function toArray(): array
{
return iterator_to_array($this);
}
/** @return array<int, mixed> */
public function column(string $column): array
{
return array_column($this->toArray(), $column);
}
}
final readonly class Credentials
{
private const USERNAME_PATTERN = '/^[a-zA-Z0-9_]{3,20}$/';
public function __construct(
public string $username,
#[\SensitiveParameter]
public string $password,
) {
$this->validate();
}
/** @param array{username: string, password: string} $config */
public static function fromArray(array $config): self
{
return new self(
username: $config['username'],
password: $config['password'],
);
}
private function validate(): void
{
if (!preg_match(self::USERNAME_PATTERN, $this->username)) {
throw new InvalidArgumentException('Invalid username format');
}
if (empty(trim($this->password))) {
throw new InvalidArgumentException('Password cannot be empty');
}
}
}
final readonly class DatabaseConfig
{
private const DATABASE_NAME_PATTERN = '/^[a-zA-Z0-9_\-]+$/';
private const MIN_PORT = 1;
private const MAX_PORT = 65535;
public function __construct(
public DatabaseDriver $driver,
public string $host,
public int $port,
public string $database,
) {
$this->validate();
}
/** @param array{driver: string, host: string, port: int, database: string} $config */
public static function fromArray(array $config): self
{
return new self(
driver: DatabaseDriver::from($config['driver']),
host: $config['host'],
port: $config['port'],
database: $config['database'],
);
}
public function __toString(): string
{
return sprintf(
'%s:host=%s;port=%d;dbname=%s;charset=utf8mb4',
$this->driver->value,
$this->host,
$this->port,
$this->database
);
}
private function validate(): void
{
if (!$this->driver->isSupported()) {
$available = implode(', ', PDO::getAvailableDrivers());
throw new InvalidArgumentException("Driver {$this->driver->value} is not supported. Available: {$available}");
}
if (!$this->isValidHost($this->host)) {
throw new InvalidArgumentException('Invalid host format');
}
if (!$this->isValidPort($this->port)) {
throw new InvalidArgumentException('Invalid port');
}
if (!preg_match(self::DATABASE_NAME_PATTERN, $this->database)) {
throw new InvalidArgumentException('Invalid database name');
}
}
private function isValidHost(string $host): bool
{
return filter_var($host, FILTER_VALIDATE_IP) !== false ||
filter_var($host, FILTER_VALIDATE_DOMAIN, FILTER_FLAG_HOSTNAME) !== false;
}
private function isValidPort(int $port): bool
{
return $port >= self::MIN_PORT && $port <= self::MAX_PORT;
}
}
final class Database implements Closable
{
private const PARAMETER_NAME_PATTERN = '/^:[a-zA-Z_][a-zA-Z0-9_]*$/';
private const NAMED_PARAMETER_PATTERN = '/^:[a-zA-Z_][a-zA-Z0-9_]*$/';
private ?PDOStatement $statement = null;
/** @var array<string, mixed> */
private array $boundParameters = [];
public function __construct(
private ?PDO $pdo,
) {}
public static function create(PDO $pdo): self
{
return new self($pdo);
}
public function lastInsertId(): string
{
return $this->pdo?->lastInsertId() ?? throw new LogicException('No PDO connection available');
}
public function useTransaction(callable $callback): mixed
{
if ($this->pdo?->inTransaction()) {
throw new RuntimeException('Nested transactions not supported');
}
$this->pdo->beginTransaction();
try {
$result = $callback($this);
$this->pdo->commit();
return $result;
} catch (Throwable $error) {
$this->pdo->rollBack();
throw $error;
}
}
/** @param array<string, mixed> $parameters */
public function executeQuery(string $query, array $parameters = []): QueryResult
{
return $this->prepare($query)->execute($parameters);
}
public function _executeQuery(string $query, array $parameters = []): ?PDOStatement
{
return $this->prepare($query)->_execute($parameters);
}
public function close(): void
{
$this->statement = null;
$this->boundParameters = [];
$this->pdo = null;
}
private function prepare(string $query): self
{
if (empty(trim($query))) {
throw new InvalidArgumentException('Query is required');
}
$this->statement = $this->pdo?->prepare($query);
if ($this->statement === null) {
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown database error';
throw new InvalidArgumentException($errorInfo);
}
return $this;
}
private function _execute(array $parameters = []): ?PDOStatement
{
if ($this->statement === null) {
throw new LogicException('Prepare statement first');
}
if (!empty($parameters)) {
$this->bindParameters($parameters);
}
$this->boundParameters = [];
if ($this->statement->execute() === false) {
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown execution error';
throw new InvalidArgumentException($errorInfo);
}
return $this->statement;
}
/** @param array<string, mixed> $parameters */
private function execute(array $parameters = []): QueryResult
{
if ($this->statement === null) {
throw new LogicException('Prepare statement first');
}
if (!empty($parameters)) {
$this->bindParameters($parameters);
}
$this->boundParameters = [];
if ($this->statement->execute() === false) {
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown execution error';
throw new InvalidArgumentException($errorInfo);
}
return QueryResult::create($this->statement);
}
/** @param array<string, mixed> $parameters */
private function bindParameters(array $parameters): void
{
if ($this->statement === null) {
throw new LogicException('Prepare statement first');
}
foreach ($parameters as $key => $value) {
$this->bindParameter((string) $key, $value);
}
}
private function bindParameter(string $key, mixed $value): void
{
$isNamed = str_starts_with($key, ':');
$isPositional = $key === '?';
if (!$isNamed && !$isPositional && !preg_match(self::NAMED_PARAMETER_PATTERN, ":$key")) {
throw new InvalidArgumentException("Invalid parameter name: {$key}");
}
if (!preg_match(self::PARAMETER_NAME_PATTERN, $key)) {
throw new InvalidArgumentException("Invalid parameter name: {$key}");
}
if (array_key_exists($key, $this->boundParameters)) {
throw new RuntimeException("Parameter {$key} has already been bound");
}
$this->boundParameters[$key] = $value;
$parameterKey = ':' . ltrim($key, ':');
$dataType = PDODataType::fromValue($value);
if ($this->statement?->bindValue($parameterKey, $value, $dataType->value) === false) {
throw new RuntimeException("Failed to bind parameter {$key}");
}
}
}
<?php
declare(strict_types=1);
use DB\Database;
use function DB\createPDO;
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('Access-Control-Allow-Origin: *');
if (ob_get_level()) {
ob_end_clean();
}
const ROOT = __DIR__;
require_once ROOT . '/lib/DB.php';
// Generator for fetching names efficiently
function fetchNames(Database $database): Generator
{
$statement = $database->_executeQuery(<<<SQL
SELECT
test_sse.id,
test_sse.name,
test_sse.created_at,
test_sse.updated_at
FROM
test_sse
ORDER BY test_sse.updated_at DESC
LIMIT 50
SQL);
if ($statement) {
while ($row = $statement->fetch(PDO::FETCH_ASSOC)) {
yield $row;
}
$statement->closeCursor();
}
}
// Enhanced change detection with multiple strategies
class DatabaseChangeDetector
{
private string $lastModified = '';
private int $lastCount = 0;
private string $lastChecksum = '';
public function __construct(private Database $database) {}
public function detectChanges(): ?array
{
try {
// Strategy 1: Use timestamp and count (fastest)
$quickCheck = $this->quickChangeCheck();
if ($quickCheck) {
return $quickCheck;
}
// Strategy 2: Use checksum for data integrity (more thorough)
$checksumCheck = $this->checksumChangeCheck();
if ($checksumCheck) {
return $checksumCheck;
}
return null;
} catch (Throwable $e) {
return [
'type' => 'error',
'message' => $e->getMessage(),
'timestamp' => date('c'),
];
}
}
private function quickChangeCheck(): ?array
{
$queryResult = $this->database->executeQuery(<<<SQL
SELECT
COUNT(*) as total_count,
COALESCE(MAX(test_sse.updated_at), '') as last_modified
FROM
test_sse
SQL);
$state = $queryResult->first(); // Fix: get first row
if (!$state) {
return null;
}
$currentModified = $state['last_modified'] ?? '';
$currentCount = (int)($state['total_count'] ?? 0);
$hasChanges = false;
$changeType = '';
// Detect change type
if ($this->lastModified === '' && $currentCount > 0) {
$hasChanges = true;
$changeType = 'initial';
} elseif ($this->lastModified !== $currentModified && $this->lastModified !== '') {
$hasChanges = true;
$changeType = 'update';
} elseif ($this->lastCount !== $currentCount && $this->lastCount !== 0) {
$hasChanges = true;
$changeType = ($currentCount > $this->lastCount) ? 'insert' : 'delete';
}
if ($hasChanges) {
$this->lastModified = $currentModified;
$this->lastCount = $currentCount;
return [
'type' => 'database_change',
'change_type' => $changeType,
'timestamp' => date('c'),
'total_count' => $currentCount,
'last_modified' => $currentModified,
'detection_method' => 'timestamp_count'
];
}
return null;
}
private function checksumChangeCheck(): ?array
{
// Use a hash of recent data to detect subtle changes
$queryResult = $this->database->executeQuery(<<<SQL
SELECT
GROUP_CONCAT(
CONCAT(test_sse.id, ':', test_sse.name, ':', test_sse.updated_at)
ORDER BY test_sse.id
) as data_hash
FROM
test_sse
WHERE
test_sse.updated_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
SQL);
$result = $queryResult->first();
$currentChecksum = md5($result['data_hash'] ?? '');
if ($this->lastChecksum !== '' && $this->lastChecksum !== $currentChecksum) {
$this->lastChecksum = $currentChecksum;
return [
'type' => 'database_change',
'change_type' => 'data_integrity_change',
'timestamp' => date('c'),
'detection_method' => 'checksum',
'checksum' => substr($currentChecksum, 0, 8) // Show partial for debugging
];
}
$this->lastChecksum = $currentChecksum;
return null;
}
}
// Efficient database monitoring with adaptive polling
function monitorDatabaseChanges(Database $database): Generator
{
$detector = new DatabaseChangeDetector($database);
$pollInterval = 2; // Start with 2 seconds
$maxPollInterval = 30; // Max 30 seconds when no changes
$minPollInterval = 1; // Min 1 second when active
$noChangeCount = 0;
while (true) {
if (connection_aborted()) {
return;
}
$change = $detector->detectChanges();
if ($change) {
// Reset polling interval on change
$pollInterval = $minPollInterval;
$noChangeCount = 0;
// Add the actual data only when there's a change
if ($change['type'] === 'database_change') {
$change['names'] = fetchNames($database);
}
yield $change;
} else {
// Increase polling interval when no changes (adaptive polling)
$noChangeCount++;
if ($noChangeCount > 5) {
$pollInterval = min($pollInterval * 1.5, $maxPollInterval);
}
}
sleep((int)$pollInterval);
}
}
// Heartbeat generator for connection health
function heartbeatGenerator(int $intervalSeconds = 60): Generator
{
$lastHeartbeat = 0;
$startTime = time();
while (true) {
$currentTime = time();
if ($currentTime - $lastHeartbeat >= $intervalSeconds) {
yield [
'type' => 'heartbeat',
'timestamp' => date('c'),
'uptime' => $currentTime - $startTime,
'next_heartbeat_in' => $intervalSeconds
];
$lastHeartbeat = $currentTime;
}
yield null; // No heartbeat needed yet
}
}
function formatSSEEvent(array $data, ?string $event = null, ?string $id = null): Generator
{
if ($event) {
yield "event: $event\n";
}
if ($id) {
yield "id: $id\n";
}
yield "data: " . json_encode($data) . "\n\n";
}
function streamSSEEvents(Database $database): Generator
{
// Send initial connection event
yield from formatSSEEvent([
'type' => 'connected',
'message' => 'SSE connection established with enhanced change detection',
'timestamp' => date('c'),
'features' => ['adaptive_polling', 'checksum_detection', 'efficient_generators']
], 'connection', uniqid());
$dbMonitor = monitorDatabaseChanges($database);
$heartbeat = heartbeatGenerator(60);
while (true) {
if (connection_aborted()) {
break;
}
// Check for database changes
if ($dbMonitor->valid()) {
$change = $dbMonitor->current();
if ($change) {
// Convert generator to array only when we need to send data
if (isset($change['names'])) {
$names = [];
foreach ($change['names'] as $name) {
$names[] = $name;
}
$change['names'] = $names;
}
yield from formatSSEEvent($change, 'database_change', uniqid());
}
$dbMonitor->next();
}
// Check for heartbeat
if ($heartbeat->valid()) {
$heartbeatData = $heartbeat->current();
if ($heartbeatData) {
yield from formatSSEEvent($heartbeatData, 'heartbeat', uniqid());
}
$heartbeat->next();
}
usleep(100000); // 0.1 second micro-sleep
}
}
// Graceful error handling wrapper
function handleSSEStream(Database $database): Generator
{
try {
yield from streamSSEEvents($database);
} catch (Throwable $e) {
yield from formatSSEEvent([
'type' => 'error',
'message' => $e->getMessage(),
'timestamp' => date('c'),
'file' => basename($e->getFile()),
'line' => $e->getLine()
], 'error', uniqid());
// Try to recover with a simple reconnection message
yield from formatSSEEvent([
'type' => 'recovery_attempt',
'message' => 'Attempting to recover connection...',
'timestamp' => date('c')
], 'recovery', uniqid());
}
}
// Main execution
try {
$database = Database::create(createPDO());
foreach (handleSSEStream($database) as $chunk) {
echo $chunk;
flush();
}
} catch (Throwable $e) {
// Final fallback error
$errorEvent = formatSSEEvent([
'type' => 'fatal_error',
'message' => $e->getMessage(),
'timestamp' => date('c')
], 'error', uniqid());
foreach ($errorEvent as $chunk) {
echo $chunk;
flush();
}
} finally {
$database?->close();
}
CREATE TABLE `test_sse` (
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
`created_at` TIMESTAMP NOT NULL DEFAULT current_timestamp(),
`updated_at` TIMESTAMP NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(),
PRIMARY KEY (`id`) USING BTREE
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment