Created
June 26, 2025 09:15
-
-
Save nandordudas/b33fe1772d44c87b9a42fdd457bcb653 to your computer and use it in GitHub Desktop.
sse
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// [TODO] no client code yet |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace DB; | |
use Countable; | |
use InvalidArgumentException; | |
use IteratorAggregate; | |
use LogicException; | |
use PDO; | |
use PDOStatement; | |
use RuntimeException; | |
use Throwable; | |
use Traversable; | |
enum DatabaseDriver: string | |
{ | |
case MYSQL = 'mysql'; | |
case POSTGRESQL = 'pgsql'; | |
case SQLITE = 'sqlite'; | |
case SQLSERVER = 'sqlsrv'; | |
public function isSupported(): bool | |
{ | |
return match ($this) { | |
self::SQLITE => false, // Not supported yet | |
default => in_array($this->value, PDO::getAvailableDrivers(), true), | |
}; | |
} | |
} | |
enum PDODataType: int | |
{ | |
case NULL = PDO::PARAM_NULL; | |
case INT = PDO::PARAM_INT; | |
case STR = PDO::PARAM_STR; | |
case LOB = PDO::PARAM_LOB; | |
case BOOL = PDO::PARAM_BOOL; | |
public static function fromValue(mixed $value): self | |
{ | |
return match (true) { | |
is_null($value) => self::NULL, | |
is_int($value) => self::INT, | |
is_resource($value) => self::LOB, | |
is_bool($value) => self::BOOL, | |
default => self::STR, | |
}; | |
} | |
} | |
interface Closable | |
{ | |
public function close(): void; | |
} | |
/** @extends IteratorAggregate<int, mixed> */ | |
interface QueryResultContract extends IteratorAggregate, Countable, Closable | |
{ | |
/** @return ?array<string, mixed> */ | |
public function next(): ?array; | |
/** @return ?array<string, mixed> */ | |
public function first(): ?array; | |
} | |
abstract class QueryResultBase implements QueryResultContract | |
{ | |
protected ?PDOStatement $statement; | |
private bool $closed = false; | |
public function __construct(PDOStatement $statement) | |
{ | |
$this->statement = $statement; | |
} | |
public function close(): void | |
{ | |
if (!$this->closed) { | |
$this->statement?->closeCursor(); | |
$this->statement = null; | |
$this->closed = true; | |
} | |
} | |
protected function ensureNotClosed(): void | |
{ | |
if ($this->closed) { | |
throw new LogicException('Query result has been closed'); | |
} | |
} | |
/** @return ?array<string, mixed> */ | |
public function first(): ?array | |
{ | |
$result = $this->statement?->fetch(); | |
$this->close(); | |
return $result ?: null; | |
} | |
/** @return ?array<string, mixed> */ | |
public function next(): ?array | |
{ | |
return $this->statement?->fetch() ?: null; | |
} | |
public function getIterator(): Traversable | |
{ | |
while ($row = $this->statement?->fetch(PDO::FETCH_ASSOC)) { | |
yield $row; | |
} | |
$this->statement?->closeCursor(); | |
} | |
/** | |
* @warning May not work with SELECT in all database drivers | |
*/ | |
public function count(): int | |
{ | |
return $this->statement?->rowCount() ?? 0; | |
} | |
public function __destruct() | |
{ | |
$this->close(); | |
} | |
} | |
final class QueryResult extends QueryResultBase | |
{ | |
public static function create(PDOStatement $statement): self | |
{ | |
return new self($statement); | |
} | |
/** @return array<int, mixed> */ | |
public function toArray(): array | |
{ | |
return iterator_to_array($this); | |
} | |
/** @return array<int, mixed> */ | |
public function column(string $column): array | |
{ | |
return array_column($this->toArray(), $column); | |
} | |
} | |
final readonly class Credentials | |
{ | |
private const USERNAME_PATTERN = '/^[a-zA-Z0-9_]{3,20}$/'; | |
public function __construct( | |
public string $username, | |
#[\SensitiveParameter] | |
public string $password, | |
) { | |
$this->validate(); | |
} | |
/** @param array{username: string, password: string} $config */ | |
public static function fromArray(array $config): self | |
{ | |
return new self( | |
username: $config['username'], | |
password: $config['password'], | |
); | |
} | |
private function validate(): void | |
{ | |
if (!preg_match(self::USERNAME_PATTERN, $this->username)) { | |
throw new InvalidArgumentException('Invalid username format'); | |
} | |
if (empty(trim($this->password))) { | |
throw new InvalidArgumentException('Password cannot be empty'); | |
} | |
} | |
} | |
final readonly class DatabaseConfig | |
{ | |
private const DATABASE_NAME_PATTERN = '/^[a-zA-Z0-9_\-]+$/'; | |
private const MIN_PORT = 1; | |
private const MAX_PORT = 65535; | |
public function __construct( | |
public DatabaseDriver $driver, | |
public string $host, | |
public int $port, | |
public string $database, | |
) { | |
$this->validate(); | |
} | |
/** @param array{driver: string, host: string, port: int, database: string} $config */ | |
public static function fromArray(array $config): self | |
{ | |
return new self( | |
driver: DatabaseDriver::from($config['driver']), | |
host: $config['host'], | |
port: $config['port'], | |
database: $config['database'], | |
); | |
} | |
public function __toString(): string | |
{ | |
return sprintf( | |
'%s:host=%s;port=%d;dbname=%s;charset=utf8mb4', | |
$this->driver->value, | |
$this->host, | |
$this->port, | |
$this->database | |
); | |
} | |
private function validate(): void | |
{ | |
if (!$this->driver->isSupported()) { | |
$available = implode(', ', PDO::getAvailableDrivers()); | |
throw new InvalidArgumentException("Driver {$this->driver->value} is not supported. Available: {$available}"); | |
} | |
if (!$this->isValidHost($this->host)) { | |
throw new InvalidArgumentException('Invalid host format'); | |
} | |
if (!$this->isValidPort($this->port)) { | |
throw new InvalidArgumentException('Invalid port'); | |
} | |
if (!preg_match(self::DATABASE_NAME_PATTERN, $this->database)) { | |
throw new InvalidArgumentException('Invalid database name'); | |
} | |
} | |
private function isValidHost(string $host): bool | |
{ | |
return filter_var($host, FILTER_VALIDATE_IP) !== false || | |
filter_var($host, FILTER_VALIDATE_DOMAIN, FILTER_FLAG_HOSTNAME) !== false; | |
} | |
private function isValidPort(int $port): bool | |
{ | |
return $port >= self::MIN_PORT && $port <= self::MAX_PORT; | |
} | |
} | |
final class Database implements Closable | |
{ | |
private const PARAMETER_NAME_PATTERN = '/^:[a-zA-Z_][a-zA-Z0-9_]*$/'; | |
private const NAMED_PARAMETER_PATTERN = '/^:[a-zA-Z_][a-zA-Z0-9_]*$/'; | |
private ?PDOStatement $statement = null; | |
/** @var array<string, mixed> */ | |
private array $boundParameters = []; | |
public function __construct( | |
private ?PDO $pdo, | |
) {} | |
public static function create(PDO $pdo): self | |
{ | |
return new self($pdo); | |
} | |
public function lastInsertId(): string | |
{ | |
return $this->pdo?->lastInsertId() ?? throw new LogicException('No PDO connection available'); | |
} | |
public function useTransaction(callable $callback): mixed | |
{ | |
if ($this->pdo?->inTransaction()) { | |
throw new RuntimeException('Nested transactions not supported'); | |
} | |
$this->pdo->beginTransaction(); | |
try { | |
$result = $callback($this); | |
$this->pdo->commit(); | |
return $result; | |
} catch (Throwable $error) { | |
$this->pdo->rollBack(); | |
throw $error; | |
} | |
} | |
/** @param array<string, mixed> $parameters */ | |
public function executeQuery(string $query, array $parameters = []): QueryResult | |
{ | |
return $this->prepare($query)->execute($parameters); | |
} | |
public function _executeQuery(string $query, array $parameters = []): ?PDOStatement | |
{ | |
return $this->prepare($query)->_execute($parameters); | |
} | |
public function close(): void | |
{ | |
$this->statement = null; | |
$this->boundParameters = []; | |
$this->pdo = null; | |
} | |
private function prepare(string $query): self | |
{ | |
if (empty(trim($query))) { | |
throw new InvalidArgumentException('Query is required'); | |
} | |
$this->statement = $this->pdo?->prepare($query); | |
if ($this->statement === null) { | |
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown database error'; | |
throw new InvalidArgumentException($errorInfo); | |
} | |
return $this; | |
} | |
private function _execute(array $parameters = []): ?PDOStatement | |
{ | |
if ($this->statement === null) { | |
throw new LogicException('Prepare statement first'); | |
} | |
if (!empty($parameters)) { | |
$this->bindParameters($parameters); | |
} | |
$this->boundParameters = []; | |
if ($this->statement->execute() === false) { | |
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown execution error'; | |
throw new InvalidArgumentException($errorInfo); | |
} | |
return $this->statement; | |
} | |
/** @param array<string, mixed> $parameters */ | |
private function execute(array $parameters = []): QueryResult | |
{ | |
if ($this->statement === null) { | |
throw new LogicException('Prepare statement first'); | |
} | |
if (!empty($parameters)) { | |
$this->bindParameters($parameters); | |
} | |
$this->boundParameters = []; | |
if ($this->statement->execute() === false) { | |
$errorInfo = $this->pdo?->errorInfo()[2] ?? 'Unknown execution error'; | |
throw new InvalidArgumentException($errorInfo); | |
} | |
return QueryResult::create($this->statement); | |
} | |
/** @param array<string, mixed> $parameters */ | |
private function bindParameters(array $parameters): void | |
{ | |
if ($this->statement === null) { | |
throw new LogicException('Prepare statement first'); | |
} | |
foreach ($parameters as $key => $value) { | |
$this->bindParameter((string) $key, $value); | |
} | |
} | |
private function bindParameter(string $key, mixed $value): void | |
{ | |
$isNamed = str_starts_with($key, ':'); | |
$isPositional = $key === '?'; | |
if (!$isNamed && !$isPositional && !preg_match(self::NAMED_PARAMETER_PATTERN, ":$key")) { | |
throw new InvalidArgumentException("Invalid parameter name: {$key}"); | |
} | |
if (!preg_match(self::PARAMETER_NAME_PATTERN, $key)) { | |
throw new InvalidArgumentException("Invalid parameter name: {$key}"); | |
} | |
if (array_key_exists($key, $this->boundParameters)) { | |
throw new RuntimeException("Parameter {$key} has already been bound"); | |
} | |
$this->boundParameters[$key] = $value; | |
$parameterKey = ':' . ltrim($key, ':'); | |
$dataType = PDODataType::fromValue($value); | |
if ($this->statement?->bindValue($parameterKey, $value, $dataType->value) === false) { | |
throw new RuntimeException("Failed to bind parameter {$key}"); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
use DB\Database; | |
use function DB\createPDO; | |
header('Content-Type: text/event-stream'); | |
header('Cache-Control: no-cache'); | |
header('Connection: keep-alive'); | |
header('Access-Control-Allow-Origin: *'); | |
if (ob_get_level()) { | |
ob_end_clean(); | |
} | |
const ROOT = __DIR__; | |
require_once ROOT . '/lib/DB.php'; | |
// Generator for fetching names efficiently | |
function fetchNames(Database $database): Generator | |
{ | |
$statement = $database->_executeQuery(<<<SQL | |
SELECT | |
test_sse.id, | |
test_sse.name, | |
test_sse.created_at, | |
test_sse.updated_at | |
FROM | |
test_sse | |
ORDER BY test_sse.updated_at DESC | |
LIMIT 50 | |
SQL); | |
if ($statement) { | |
while ($row = $statement->fetch(PDO::FETCH_ASSOC)) { | |
yield $row; | |
} | |
$statement->closeCursor(); | |
} | |
} | |
// Enhanced change detection with multiple strategies | |
class DatabaseChangeDetector | |
{ | |
private string $lastModified = ''; | |
private int $lastCount = 0; | |
private string $lastChecksum = ''; | |
public function __construct(private Database $database) {} | |
public function detectChanges(): ?array | |
{ | |
try { | |
// Strategy 1: Use timestamp and count (fastest) | |
$quickCheck = $this->quickChangeCheck(); | |
if ($quickCheck) { | |
return $quickCheck; | |
} | |
// Strategy 2: Use checksum for data integrity (more thorough) | |
$checksumCheck = $this->checksumChangeCheck(); | |
if ($checksumCheck) { | |
return $checksumCheck; | |
} | |
return null; | |
} catch (Throwable $e) { | |
return [ | |
'type' => 'error', | |
'message' => $e->getMessage(), | |
'timestamp' => date('c'), | |
]; | |
} | |
} | |
private function quickChangeCheck(): ?array | |
{ | |
$queryResult = $this->database->executeQuery(<<<SQL | |
SELECT | |
COUNT(*) as total_count, | |
COALESCE(MAX(test_sse.updated_at), '') as last_modified | |
FROM | |
test_sse | |
SQL); | |
$state = $queryResult->first(); // Fix: get first row | |
if (!$state) { | |
return null; | |
} | |
$currentModified = $state['last_modified'] ?? ''; | |
$currentCount = (int)($state['total_count'] ?? 0); | |
$hasChanges = false; | |
$changeType = ''; | |
// Detect change type | |
if ($this->lastModified === '' && $currentCount > 0) { | |
$hasChanges = true; | |
$changeType = 'initial'; | |
} elseif ($this->lastModified !== $currentModified && $this->lastModified !== '') { | |
$hasChanges = true; | |
$changeType = 'update'; | |
} elseif ($this->lastCount !== $currentCount && $this->lastCount !== 0) { | |
$hasChanges = true; | |
$changeType = ($currentCount > $this->lastCount) ? 'insert' : 'delete'; | |
} | |
if ($hasChanges) { | |
$this->lastModified = $currentModified; | |
$this->lastCount = $currentCount; | |
return [ | |
'type' => 'database_change', | |
'change_type' => $changeType, | |
'timestamp' => date('c'), | |
'total_count' => $currentCount, | |
'last_modified' => $currentModified, | |
'detection_method' => 'timestamp_count' | |
]; | |
} | |
return null; | |
} | |
private function checksumChangeCheck(): ?array | |
{ | |
// Use a hash of recent data to detect subtle changes | |
$queryResult = $this->database->executeQuery(<<<SQL | |
SELECT | |
GROUP_CONCAT( | |
CONCAT(test_sse.id, ':', test_sse.name, ':', test_sse.updated_at) | |
ORDER BY test_sse.id | |
) as data_hash | |
FROM | |
test_sse | |
WHERE | |
test_sse.updated_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR) | |
SQL); | |
$result = $queryResult->first(); | |
$currentChecksum = md5($result['data_hash'] ?? ''); | |
if ($this->lastChecksum !== '' && $this->lastChecksum !== $currentChecksum) { | |
$this->lastChecksum = $currentChecksum; | |
return [ | |
'type' => 'database_change', | |
'change_type' => 'data_integrity_change', | |
'timestamp' => date('c'), | |
'detection_method' => 'checksum', | |
'checksum' => substr($currentChecksum, 0, 8) // Show partial for debugging | |
]; | |
} | |
$this->lastChecksum = $currentChecksum; | |
return null; | |
} | |
} | |
// Efficient database monitoring with adaptive polling | |
function monitorDatabaseChanges(Database $database): Generator | |
{ | |
$detector = new DatabaseChangeDetector($database); | |
$pollInterval = 2; // Start with 2 seconds | |
$maxPollInterval = 30; // Max 30 seconds when no changes | |
$minPollInterval = 1; // Min 1 second when active | |
$noChangeCount = 0; | |
while (true) { | |
if (connection_aborted()) { | |
return; | |
} | |
$change = $detector->detectChanges(); | |
if ($change) { | |
// Reset polling interval on change | |
$pollInterval = $minPollInterval; | |
$noChangeCount = 0; | |
// Add the actual data only when there's a change | |
if ($change['type'] === 'database_change') { | |
$change['names'] = fetchNames($database); | |
} | |
yield $change; | |
} else { | |
// Increase polling interval when no changes (adaptive polling) | |
$noChangeCount++; | |
if ($noChangeCount > 5) { | |
$pollInterval = min($pollInterval * 1.5, $maxPollInterval); | |
} | |
} | |
sleep((int)$pollInterval); | |
} | |
} | |
// Heartbeat generator for connection health | |
function heartbeatGenerator(int $intervalSeconds = 60): Generator | |
{ | |
$lastHeartbeat = 0; | |
$startTime = time(); | |
while (true) { | |
$currentTime = time(); | |
if ($currentTime - $lastHeartbeat >= $intervalSeconds) { | |
yield [ | |
'type' => 'heartbeat', | |
'timestamp' => date('c'), | |
'uptime' => $currentTime - $startTime, | |
'next_heartbeat_in' => $intervalSeconds | |
]; | |
$lastHeartbeat = $currentTime; | |
} | |
yield null; // No heartbeat needed yet | |
} | |
} | |
function formatSSEEvent(array $data, ?string $event = null, ?string $id = null): Generator | |
{ | |
if ($event) { | |
yield "event: $event\n"; | |
} | |
if ($id) { | |
yield "id: $id\n"; | |
} | |
yield "data: " . json_encode($data) . "\n\n"; | |
} | |
function streamSSEEvents(Database $database): Generator | |
{ | |
// Send initial connection event | |
yield from formatSSEEvent([ | |
'type' => 'connected', | |
'message' => 'SSE connection established with enhanced change detection', | |
'timestamp' => date('c'), | |
'features' => ['adaptive_polling', 'checksum_detection', 'efficient_generators'] | |
], 'connection', uniqid()); | |
$dbMonitor = monitorDatabaseChanges($database); | |
$heartbeat = heartbeatGenerator(60); | |
while (true) { | |
if (connection_aborted()) { | |
break; | |
} | |
// Check for database changes | |
if ($dbMonitor->valid()) { | |
$change = $dbMonitor->current(); | |
if ($change) { | |
// Convert generator to array only when we need to send data | |
if (isset($change['names'])) { | |
$names = []; | |
foreach ($change['names'] as $name) { | |
$names[] = $name; | |
} | |
$change['names'] = $names; | |
} | |
yield from formatSSEEvent($change, 'database_change', uniqid()); | |
} | |
$dbMonitor->next(); | |
} | |
// Check for heartbeat | |
if ($heartbeat->valid()) { | |
$heartbeatData = $heartbeat->current(); | |
if ($heartbeatData) { | |
yield from formatSSEEvent($heartbeatData, 'heartbeat', uniqid()); | |
} | |
$heartbeat->next(); | |
} | |
usleep(100000); // 0.1 second micro-sleep | |
} | |
} | |
// Graceful error handling wrapper | |
function handleSSEStream(Database $database): Generator | |
{ | |
try { | |
yield from streamSSEEvents($database); | |
} catch (Throwable $e) { | |
yield from formatSSEEvent([ | |
'type' => 'error', | |
'message' => $e->getMessage(), | |
'timestamp' => date('c'), | |
'file' => basename($e->getFile()), | |
'line' => $e->getLine() | |
], 'error', uniqid()); | |
// Try to recover with a simple reconnection message | |
yield from formatSSEEvent([ | |
'type' => 'recovery_attempt', | |
'message' => 'Attempting to recover connection...', | |
'timestamp' => date('c') | |
], 'recovery', uniqid()); | |
} | |
} | |
// Main execution | |
try { | |
$database = Database::create(createPDO()); | |
foreach (handleSSEStream($database) as $chunk) { | |
echo $chunk; | |
flush(); | |
} | |
} catch (Throwable $e) { | |
// Final fallback error | |
$errorEvent = formatSSEEvent([ | |
'type' => 'fatal_error', | |
'message' => $e->getMessage(), | |
'timestamp' => date('c') | |
], 'error', uniqid()); | |
foreach ($errorEvent as $chunk) { | |
echo $chunk; | |
flush(); | |
} | |
} finally { | |
$database?->close(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE `test_sse` ( | |
`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, | |
`name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci', | |
`created_at` TIMESTAMP NOT NULL DEFAULT current_timestamp(), | |
`updated_at` TIMESTAMP NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp(), | |
PRIMARY KEY (`id`) USING BTREE | |
) | |
COLLATE='utf8mb4_general_ci' | |
ENGINE=InnoDB; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment