Skip to content

Instantly share code, notes, and snippets.

@Shaked
Last active September 8, 2016 11:58
Show Gist options
  • Select an option

  • Save Shaked/bf260a1defc517c3fd92000bf1cc25b6 to your computer and use it in GitHub Desktop.

Select an option

Save Shaked/bf260a1defc517c3fd92000bf1cc25b6 to your computer and use it in GitHub Desktop.
Basic Guzzle Async Idea

Usage

$ composer install
$ php -S localhost:9992 server.php &
$ php client.php

Output

$ php client.php
Ticking...1473335751.8866
touched was touched: 1473335752.3957
Ticked!1473335753.1588
I completed at 1473335753.1589! {"request":[],"microtime":1473335752.396}
Server's microtime: 1473335752.3957
Touched - Before: -763.15999031067
<?php
require_once 'vendor/autoload.php';
$serverUrl = 'http://localhost:9992';
$curl = new GuzzleHttp\Handler\CurlMultiHandler();
$handler = GuzzleHttp\HandlerStack::create($curl);
$client = new GuzzleHttp\Client(['handler' => $handler]);
$request = new \GuzzleHttp\Psr7\Request('GET', $serverUrl);
$promise = $client->sendAsync($request)->then(function ($response) {
echo 'I completed at ' . microtime(true) . '! ' . $response->getBody() . PHP_EOL;
});
echo 'Ticking...' . microtime(true) . PHP_EOL;
$curl->tick();
echo 'Ticked!' . microtime(true) . PHP_EOL;
sleep(2);
$beforeTime = microtime(true);
// echo 'Client microtime before `wait` is: ' . $beforeTime . PHP_EOL;
// $r = $promise->wait(true);
$afterTime = microtime(true);
// echo 'Client microtime after `wait` is: ' . $afterTime . PHP_EOL;
// print_r($r);
$touchedTime = file_get_contents('./touched');
echo 'Server\'s microtime: ' . $touchedTime . PHP_EOL;
echo 'Touched - Before: ' . ($touchedTime - $beforeTime) * 1000 . PHP_EOL;
{
"require": {
"php":">=7.0.0",
"guzzlehttp/guzzle": "~6.0",
"psr/http-message": "1.0.1"
}
}
<?php
namespace GuzzleHttp\Handler;
use GuzzleHttp\Promise as P;
use GuzzleHttp\Promise\Promise;
use GuzzleHttp\Psr7;
use Psr\Http\Message\RequestInterface;
/**
* Returns an asynchronous response using curl_multi_* functions.
*
* When using the CurlMultiHandler, custom curl options can be specified as an
* associative array of curl option constants mapping to values in the
* **curl** key of the provided request options.
*
* @property resource $_mh Internal use only. Lazy loaded multi-handle.
*/
class CurlMultiHandler {
/** @var CurlFactoryInterface */
private $factory;
/**
* @var mixed
*/
private $selectTimeout;
/**
* @var mixed
*/
private $active;
/**
* @var array
*/
private $handles = [];
/**
* @var array
*/
private $delays = [];
/**
* This handler accepts the following options:
*
* - handle_factory: An optional factory used to create curl handles
* - select_timeout: Optional timeout (in seconds) to block before timing
* out while selecting curl handles. Defaults to 1 second.
*
* @param array $options
*/
public function __construct(array $options = []) {
$this->factory = isset($options['handle_factory'])
? $options['handle_factory'] : new CurlFactory(50);
$this->selectTimeout = isset($options['select_timeout'])
? $options['select_timeout'] : 1;
}
/**
* @param $name
* @return mixed
*/
public function __get($name) {
if ($name === '_mh') {
return $this->_mh = curl_multi_init();
}
throw new \BadMethodCallException();
}
public function __destruct() {
if (isset($this->_mh)) {
curl_multi_close($this->_mh);
unset($this->_mh);
}
}
/**
* @param RequestInterface $request
* @param array $options
* @return mixed
*/
public function __invoke(RequestInterface $request, array $options) {
$easy = $this->factory->create($request, $options);
$id = (int) $easy->handle;
$promise = new Promise(
[$this, 'execute'],
function () use ($id) {return $this->cancel($id);}
);
$this->addRequest(['easy' => $easy, 'deferred' => $promise]);
return $promise;
}
/**
* Ticks the curl event loop.
*/
public function tick() {
// Add any delayed handles if needed.
if ($this->delays) {
$currentTime = microtime(true);
foreach ($this->delays as $id => $delay) {
if ($currentTime >= $delay) {
unset($this->delays[$id]);
curl_multi_add_handle(
$this->_mh,
$this->handles[$id]['easy']->handle
);
}
}
}
// Step through the task queue which may add additional requests.
P\queue()->run();
if ($this->active &&
curl_multi_select($this->_mh, $this->selectTimeout) === -1
) {
// Perform a usleep if a select returns -1.
// See: https://bugs.php.net/bug.php?id=61141
usleep(250);
}
do {
curl_multi_exec($this->_mh, $this->active);
// added a usleep for 0.25 seconds to reduce load
usleep(250000);
} while ($this->active > 0);
//while ($x = curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
$this->processMessages();
}
/**
* Runs until all outstanding connections have completed.
*/
public function execute() {
$queue = P\queue();
while ($this->handles || !$queue->isEmpty()) {
// If there are no transfers, then sleep for the next delay
if (!$this->active && $this->delays) {
usleep($this->timeToNext());
}
$this->tick();
}
}
/**
* @param array $entry
*/
private function addRequest(array $entry) {
$easy = $entry['easy'];
$id = (int) $easy->handle;
$this->handles[$id] = $entry;
if (empty($easy->options['delay'])) {
curl_multi_add_handle($this->_mh, $easy->handle);
} else {
$this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
}
}
/**
* Cancels a handle from sending and removes references to it.
*
* @param int $id Handle ID to cancel and remove.
*
* @return bool True on success, false on failure.
*/
private function cancel($id) {
// Cannot cancel if it has been processed.
if (!isset($this->handles[$id])) {
return false;
}
$handle = $this->handles[$id]['easy']->handle;
unset($this->delays[$id], $this->handles[$id]);
curl_multi_remove_handle($this->_mh, $handle);
curl_close($handle);
return true;
}
private function processMessages() {
while ($done = curl_multi_info_read($this->_mh)) {
$id = (int) $done['handle'];
curl_multi_remove_handle($this->_mh, $done['handle']);
if (!isset($this->handles[$id])) {
// Probably was cancelled.
continue;
}
$entry = $this->handles[$id];
unset($this->handles[$id], $this->delays[$id]);
$entry['easy']->errno = $done['result'];
$entry['deferred']->resolve(
CurlFactory::finish(
$this,
$entry['easy'],
$this->factory
)
);
}
}
private function timeToNext() {
$currentTime = microtime(true);
$nextTime = PHP_INT_MAX;
foreach ($this->delays as $time) {
if ($time < $nextTime) {
$nextTime = $time;
}
}
return max(0, $nextTime - $currentTime) * 1000000;
}
}
<?php
$fh = fopen('php://stderr', 'a');
fwrite($fh, 'touched was touched: ' . microtime(true) . PHP_EOL);
fclose($fh);
file_put_contents('./touched', microtime(true));
echo json_encode(['request' => $_REQUEST, 'microtime' => microtime(true)]);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment