Last active
September 5, 2024 19:24
-
-
Save faizanakram99/7ac14f899b16adef36a9adf78d6c3e56 to your computer and use it in GitHub Desktop.
PHP async mysql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace Xoxo; | |
class CrossTenantQueryRunner | |
{ | |
private const EXCLUDED_TENANTS = ['demo', 'sandbox']; | |
/** | |
* @var array<string, \mysqli> | |
*/ | |
private array $connections = []; | |
/** | |
* @var array<string, string> | |
*/ | |
private array $errors = []; | |
public function __construct( | |
private TenantDatabaseUserConnectionFactory $connectionFactory, | |
private CloudInstallation $cloudInstallation, | |
private string $environment, | |
) {} | |
public function executeAsync(string $query): void | |
{ | |
foreach ($this->cloudInstallation->getTenants() as $tenant) { | |
$tenantName = $tenant->name; | |
if (\in_array($tenantName, self::EXCLUDED_TENANTS)) { | |
continue; | |
} | |
try { | |
$params = $this->connectionFactory->getConnectionParams($tenantName); | |
$this->connections[$tenantName] = new \mysqli( | |
$params['host'] ?? '127.0.0.1', | |
$params['user'] ?? 'root', | |
$params['password'] ?? '', | |
$params['dbname'] ?? (\str_replace('-', '_', $tenant->name).DbNameNormalizer::suffix($this->environment)), | |
); | |
$this->connections[$tenantName]->query($query, \MYSQLI_ASYNC); | |
} catch (\Throwable $exception) { | |
$this->errors[$tenantName] = self::error($tenantName, $exception->getMessage()); | |
} | |
} | |
} | |
public function writeResultsToCsv(string $file): void | |
{ | |
$file = new \SplFileObject($file, 'w'); | |
$headersWereAdded = false; | |
$processedResults = $this->reap(function (\mysqli_result $result, string $tenant) use (&$headersWereAdded, $file): void { | |
if (!$headersWereAdded) { | |
$headersWereAdded = true; | |
$file->fputcsv( | |
['tenant', ...\array_map(fn (object $field) => $field->name, $result->fetch_fields())], | |
escape: '', | |
); | |
} | |
while (null !== $row = $result->fetch_row()) { | |
if (false === $row) { | |
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}"; | |
continue; | |
} | |
$file->fputcsv([$tenant, ...$row], escape: ''); | |
} | |
}); | |
foreach ($processedResults as $_); // to invoke generator function | |
} | |
/** | |
* @return iterable<string, iterable<int, array<string, int|float|string|null>>> | |
*/ | |
public function iterateAssociative(): iterable | |
{ | |
yield from $this->reap(function (\mysqli_result $result, string $tenant): iterable { | |
while (null !== $row = $result->fetch_assoc()) { | |
if (false === $row) { | |
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}"; | |
continue; | |
} | |
yield $row; | |
} | |
}); | |
} | |
/** | |
* @return iterable<string, array{columns: list<string>, rows: iterable<int, list<int|float|string|null>>}> | |
*/ | |
public function iterateNumeric(): iterable | |
{ | |
yield from $this->reap(function (\mysqli_result $result, string $tenant): iterable { | |
return [ | |
'columns' => \array_map(fn (object $field) => $field->name, $result->fetch_fields()), | |
'rows' => (function () use ($result, $tenant) { | |
while (null !== $row = $result->fetch_row()) { | |
if (false === $row) { | |
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}"; | |
continue; | |
} | |
yield $row; | |
} | |
})(), | |
]; | |
}); | |
} | |
/** | |
* @template T | |
* @param \Closure(\mysqli_result, string): T $processResult | |
* @return \Generator<string, T> | |
*/ | |
public function reap(\Closure $processResult): \Generator | |
{ | |
do { | |
$read = $error = $reject = $this->connections; | |
$count = \mysqli::poll($read, $error, $reject, 1); | |
$read ??= []; | |
$error ??= []; | |
if (0 < $count) { | |
foreach ($read as $connection) { | |
\assert($connection instanceof \mysqli); | |
$tenant = \array_search($connection, $this->connections, true); | |
\assert(\is_string($tenant)); | |
try { | |
/** | |
* @var \mysqli_result|bool | |
*/ | |
$result = $connection->reap_async_query(); | |
} catch (\Throwable $exception) { | |
$this->errors[$tenant] = self::error($tenant, $exception->getMessage()); | |
continue; | |
} | |
if (false === $result) { | |
$this->errors[$tenant] = self::error($tenant, $connection->error); | |
continue; | |
} | |
if ($result instanceof \mysqli_result) { | |
yield $tenant => $processResult($result, $tenant); | |
$result->free(); | |
} | |
} | |
foreach ($error as $failed) { | |
\assert($failed instanceof \mysqli); | |
$tenant = \array_search($failed, $this->connections, true); | |
\assert(\is_string($tenant)); | |
$this->errors[$tenant] = self::error($tenant, $failed->error); | |
} | |
} else { | |
$this->errors['xoxo'] = 'Timeout - no results.'; | |
} | |
} while (\count($this->connections) !== \count($read) + \count($error) + \count($reject)); | |
} | |
/** | |
* @return array<string, string> | |
*/ | |
public function getErrors(): array | |
{ | |
return $this->errors; | |
} | |
private static function error(string $tenant, string $error): string | |
{ | |
return "Error: Query failed for tenant {$tenant} with error: {$error}"; | |
} | |
public function __destruct() | |
{ | |
foreach ($this->connections as $connection) { | |
$connection->close(); | |
} | |
$this->connections = []; | |
$this->errors = []; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment