Created
November 22, 2023 11:36
-
-
Save kodmanyagha/3416a9ed184d573fb9f8154431729795 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Services\Common\React; | |
use Evenement\EventEmitter; | |
use React\MySQL\ConnectionInterface; | |
use React\MySQL\Factory; | |
use React\MySQL\Io\LazyConnection; | |
use React\Stream\Util; | |
use function React\Promise\resolve; | |
class LazyConnectionPool extends EventEmitter implements ConnectionInterface | |
{ | |
const CS_ROUND_ROBIN = 'round-robin'; | |
const CS_BY_LOAD = 'load'; | |
/** | |
* @var LazyConnection[] | |
*/ | |
protected array $pool = []; | |
protected int $poolSize; | |
protected int $poolPointer = 0; // current connection in pool - RoundRobin | |
protected array $requestCounter = []; // count requests per connection | |
protected string $connectionSelector; | |
public function __construct(Factory $factory, string $connectionURI, int $poolSize = 10, string $connectionSelector = self::CS_ROUND_ROBIN) | |
{ | |
$this->connectionSelector = $connectionSelector; | |
$this->poolSize = $poolSize; | |
for ($i = 0; $i < $poolSize; $i++) { | |
// Why lazy connection? Because evenement is throwing exception when connection closed. | |
//$this->pool[$i] = $connection = await($factory->createConnection($connectionURI)); | |
$this->pool[$i] = $connection = $factory->createLazyConnection($connectionURI); | |
$this->requestCounter[$i] = 0; | |
Util::forwardEvents($connection, $this, ['error', 'close']); | |
} | |
} | |
/** | |
* set the internal pool-pointer to the next valid connection on depending on the connectionSelector | |
* @return int | |
*/ | |
protected function shiftPoolPointer(): int | |
{ | |
switch ($this->connectionSelector) { | |
case self::CS_ROUND_ROBIN: | |
$this->poolPointer = ($this->poolPointer + 1) % $this->poolSize; | |
break; | |
case self::CS_BY_LOAD: | |
$rcList = $this->requestCounter; // copy | |
asort($rcList, SORT_NUMERIC); | |
$this->poolPointer = key($rcList); | |
break; | |
} | |
return $this->poolPointer; | |
} | |
/** | |
* @param callable $callback received an ConnectionInterface as parameter | |
* @return mixed | |
*/ | |
protected function pooledCallback(callable $callback): mixed | |
{ | |
$pointer = $this->shiftPoolPointer(); | |
$this->requestCounter[$pointer]++; | |
$connection = $this->pool[$pointer]; | |
return $callback($connection) | |
->then(function ($result) use ($pointer) { | |
$this->requestCounter[$pointer]--; | |
return $result; | |
}); | |
} | |
protected function escapeFromMemoryLeak(ConnectionInterface $connection): void | |
{ | |
//https://github.com/friends-of-reactphp/mysql/issues/179 | |
$connection->removeAllListeners(); | |
} | |
public function query($sql, array $params = []): \React\Promise\PromiseInterface | |
{ | |
return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { | |
$result = $connection->query($sql, $params); | |
$this->escapeFromMemoryLeak($connection); | |
return $result; | |
}); | |
} | |
public function queryStream($sql, $params = []): \React\Stream\ReadableStreamInterface | |
{ | |
return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { | |
$result = $connection->queryStream($sql, $params); | |
$this->escapeFromMemoryLeak($connection); | |
return $result; | |
}); | |
} | |
public function ping(): \React\Promise\PromiseInterface | |
{ | |
return $this->pooledCallback(function (ConnectionInterface $connection) { | |
return $connection->ping(); | |
}); | |
} | |
public function quit(): \React\Promise\PromiseInterface | |
{ | |
return resolve(array_map(function ($connection) { | |
$connection->removeAllListeners(); | |
$connection->quit(); | |
return $connection; | |
}, $this->pool)); | |
} | |
public function close(): \React\Promise\PromiseInterface | |
{ | |
return resolve(array_map(function ($connection) { | |
$connection->close(); | |
return $connection; | |
}, $this->pool)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment