Skip to content

Instantly share code, notes, and snippets.

@faizanakram99
Last active September 5, 2024 19:24
Show Gist options
  • Save faizanakram99/7ac14f899b16adef36a9adf78d6c3e56 to your computer and use it in GitHub Desktop.
Save faizanakram99/7ac14f899b16adef36a9adf78d6c3e56 to your computer and use it in GitHub Desktop.
PHP async mysql
<?php
declare(strict_types=1);
namespace Xoxo;
class CrossTenantQueryRunner
{
private const EXCLUDED_TENANTS = ['demo', 'sandbox'];
/**
* @var array<string, \mysqli>
*/
private array $connections = [];
/**
* @var array<string, string>
*/
private array $errors = [];
public function __construct(
private TenantDatabaseUserConnectionFactory $connectionFactory,
private CloudInstallation $cloudInstallation,
private string $environment,
) {}
public function executeAsync(string $query): void
{
foreach ($this->cloudInstallation->getTenants() as $tenant) {
$tenantName = $tenant->name;
if (\in_array($tenantName, self::EXCLUDED_TENANTS)) {
continue;
}
try {
$params = $this->connectionFactory->getConnectionParams($tenantName);
$this->connections[$tenantName] = new \mysqli(
$params['host'] ?? '127.0.0.1',
$params['user'] ?? 'root',
$params['password'] ?? '',
$params['dbname'] ?? (\str_replace('-', '_', $tenant->name).DbNameNormalizer::suffix($this->environment)),
);
$this->connections[$tenantName]->query($query, \MYSQLI_ASYNC);
} catch (\Throwable $exception) {
$this->errors[$tenantName] = self::error($tenantName, $exception->getMessage());
}
}
}
public function writeResultsToCsv(string $file): void
{
$file = new \SplFileObject($file, 'w');
$headersWereAdded = false;
$processedResults = $this->reap(function (\mysqli_result $result, string $tenant) use (&$headersWereAdded, $file): void {
if (!$headersWereAdded) {
$headersWereAdded = true;
$file->fputcsv(
['tenant', ...\array_map(fn (object $field) => $field->name, $result->fetch_fields())],
escape: '',
);
}
while (null !== $row = $result->fetch_row()) {
if (false === $row) {
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}";
continue;
}
$file->fputcsv([$tenant, ...$row], escape: '');
}
});
foreach ($processedResults as $_); // to invoke generator function
}
/**
* @return iterable<string, iterable<int, array<string, int|float|string|null>>>
*/
public function iterateAssociative(): iterable
{
yield from $this->reap(function (\mysqli_result $result, string $tenant): iterable {
while (null !== $row = $result->fetch_assoc()) {
if (false === $row) {
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}";
continue;
}
yield $row;
}
});
}
/**
* @return iterable<string, array{columns: list<string>, rows: iterable<int, list<int|float|string|null>>}>
*/
public function iterateNumeric(): iterable
{
yield from $this->reap(function (\mysqli_result $result, string $tenant): iterable {
return [
'columns' => \array_map(fn (object $field) => $field->name, $result->fetch_fields()),
'rows' => (function () use ($result, $tenant) {
while (null !== $row = $result->fetch_row()) {
if (false === $row) {
$this->errors[$tenant] = "Error: Failed to fetch row of query result for tenant {$tenant}";
continue;
}
yield $row;
}
})(),
];
});
}
/**
* @template T
* @param \Closure(\mysqli_result, string): T $processResult
* @return \Generator<string, T>
*/
public function reap(\Closure $processResult): \Generator
{
do {
$read = $error = $reject = $this->connections;
$count = \mysqli::poll($read, $error, $reject, 1);
$read ??= [];
$error ??= [];
if (0 < $count) {
foreach ($read as $connection) {
\assert($connection instanceof \mysqli);
$tenant = \array_search($connection, $this->connections, true);
\assert(\is_string($tenant));
try {
/**
* @var \mysqli_result|bool
*/
$result = $connection->reap_async_query();
} catch (\Throwable $exception) {
$this->errors[$tenant] = self::error($tenant, $exception->getMessage());
continue;
}
if (false === $result) {
$this->errors[$tenant] = self::error($tenant, $connection->error);
continue;
}
if ($result instanceof \mysqli_result) {
yield $tenant => $processResult($result, $tenant);
$result->free();
}
}
foreach ($error as $failed) {
\assert($failed instanceof \mysqli);
$tenant = \array_search($failed, $this->connections, true);
\assert(\is_string($tenant));
$this->errors[$tenant] = self::error($tenant, $failed->error);
}
} else {
$this->errors['xoxo'] = 'Timeout - no results.';
}
} while (\count($this->connections) !== \count($read) + \count($error) + \count($reject));
}
/**
* @return array<string, string>
*/
public function getErrors(): array
{
return $this->errors;
}
private static function error(string $tenant, string $error): string
{
return "Error: Query failed for tenant {$tenant} with error: {$error}";
}
public function __destruct()
{
foreach ($this->connections as $connection) {
$connection->close();
}
$this->connections = [];
$this->errors = [];
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment