Skip to content

Instantly share code, notes, and snippets.

@kodmanyagha
Created November 22, 2023 11:36
Show Gist options
  • Save kodmanyagha/3416a9ed184d573fb9f8154431729795 to your computer and use it in GitHub Desktop.
Save kodmanyagha/3416a9ed184d573fb9f8154431729795 to your computer and use it in GitHub Desktop.
<?php
declare(strict_types=1);
namespace App\Services\Common\React;
use Evenement\EventEmitter;
use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;
use React\MySQL\Io\LazyConnection;
use React\Stream\Util;
use function React\Promise\resolve;
class LazyConnectionPool extends EventEmitter implements ConnectionInterface
{
const CS_ROUND_ROBIN = 'round-robin';
const CS_BY_LOAD = 'load';
/**
* @var LazyConnection[]
*/
protected array $pool = [];
protected int $poolSize;
protected int $poolPointer = 0; // current connection in pool - RoundRobin
protected array $requestCounter = []; // count requests per connection
protected string $connectionSelector;
public function __construct(Factory $factory, string $connectionURI, int $poolSize = 10, string $connectionSelector = self::CS_ROUND_ROBIN)
{
$this->connectionSelector = $connectionSelector;
$this->poolSize = $poolSize;
for ($i = 0; $i < $poolSize; $i++) {
// Why lazy connection? Because evenement is throwing exception when connection closed.
//$this->pool[$i] = $connection = await($factory->createConnection($connectionURI));
$this->pool[$i] = $connection = $factory->createLazyConnection($connectionURI);
$this->requestCounter[$i] = 0;
Util::forwardEvents($connection, $this, ['error', 'close']);
}
}
/**
* set the internal pool-pointer to the next valid connection on depending on the connectionSelector
* @return int
*/
protected function shiftPoolPointer(): int
{
switch ($this->connectionSelector) {
case self::CS_ROUND_ROBIN:
$this->poolPointer = ($this->poolPointer + 1) % $this->poolSize;
break;
case self::CS_BY_LOAD:
$rcList = $this->requestCounter; // copy
asort($rcList, SORT_NUMERIC);
$this->poolPointer = key($rcList);
break;
}
return $this->poolPointer;
}
/**
* @param callable $callback received an ConnectionInterface as parameter
* @return mixed
*/
protected function pooledCallback(callable $callback): mixed
{
$pointer = $this->shiftPoolPointer();
$this->requestCounter[$pointer]++;
$connection = $this->pool[$pointer];
return $callback($connection)
->then(function ($result) use ($pointer) {
$this->requestCounter[$pointer]--;
return $result;
});
}
protected function escapeFromMemoryLeak(ConnectionInterface $connection): void
{
//https://github.com/friends-of-reactphp/mysql/issues/179
$connection->removeAllListeners();
}
public function query($sql, array $params = []): \React\Promise\PromiseInterface
{
return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) {
$result = $connection->query($sql, $params);
$this->escapeFromMemoryLeak($connection);
return $result;
});
}
public function queryStream($sql, $params = []): \React\Stream\ReadableStreamInterface
{
return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) {
$result = $connection->queryStream($sql, $params);
$this->escapeFromMemoryLeak($connection);
return $result;
});
}
public function ping(): \React\Promise\PromiseInterface
{
return $this->pooledCallback(function (ConnectionInterface $connection) {
return $connection->ping();
});
}
public function quit(): \React\Promise\PromiseInterface
{
return resolve(array_map(function ($connection) {
$connection->removeAllListeners();
$connection->quit();
return $connection;
}, $this->pool));
}
public function close(): \React\Promise\PromiseInterface
{
return resolve(array_map(function ($connection) {
$connection->close();
return $connection;
}, $this->pool));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment