Skip to content

Instantly share code, notes, and snippets.

@TomZhuPlanetart
Last active October 6, 2024 16:24
Show Gist options
  • Save TomZhuPlanetart/69318f9f527d6a3564e4829a9289960d to your computer and use it in GitHub Desktop.
Save TomZhuPlanetart/69318f9f527d6a3564e4829a9289960d to your computer and use it in GitHub Desktop.
PHP Coroutine implementation with generator
<?php
enum PullStatus {
case PENDING;
case RESOLVED;
}
class Future {
private $callbacks = [];
private $status = PullStatus::PENDING;
private $value = null;
private ?Closure $next;
public function __construct(private Generator|Closure $generator)
{
$this->next = $this->startup(...);
}
protected function startup()
{
if ($this->generator instanceof Closure) {
$returnValue = call_user_func($this->generator);
if ($returnValue instanceof Generator) {
$this->generator = $returnValue;
}
}
if ($this->generator instanceof Generator) {
return $this->generator->current();
} else {
$this->status = PullStatus::RESOLVED;
$this->value = $this->generator;
return null;
}
}
public function then(callable $callback)
{
if ($this->status === PullStatus::RESOLVED) {
$callback($this->value);
} else {
$this->callbacks[] = $callback;
}
}
protected function invokeCallbacks($value)
{
foreach ($this->callbacks as $callback) {
$callback($value);
}
$this->callbacks = [];
}
protected function callNext()
{
try {
return call_user_func($this->next);
} finally {
$this->next = null;
}
}
protected function resume($v = null)
{
try {
return $this->generator->send($v);
} finally {
if (!$this->generator->valid()) {
$this->status = PullStatus::RESOLVED;
$this->value = $this->generator->getReturn();
}
}
}
protected function toFeature($v)
{
if ($v instanceof Generator) {
$v = new Future($v);
}
if (!($v instanceof Future)) {
$v = new Future(function () use ($v) {
return $v;
});
}
return $v;
}
/**
*
*/
public function pull(Closure $wake)
{
if (!$this->next) {
return ;
}
$n = $this->callNext();
if ($this->status === PullStatus::RESOLVED) {
$this->invokeCallbacks($this->value);
return ;
}
$n = $this->toFeature($n);
$n->then(function ($v) use ($wake) {
$this->next = function() use ($v) {
return $this->resume($v);
};
$wake($this);
});
$wake($n);
}
}
class WaitAllFuture extends Future {
private $futures = [];
private $bind = false;
private $result = [];
public function __construct(...$futures)
{
$this->futures = array_map($this->toFeature(...), $futures);
parent::__construct(function(){});
}
public function pull(Closure $wake)
{
if ($this->bind) {
if (count($this->result) === count($this->futures)) {
$this->invokeCallbacks($this->result);
}
return ;
}
$this->bind = true;
foreach ($this->futures as $i => $future) {
$future->then(function($v) use ($wake, $i) {
$this->result[$i] = $v;
$wake($this);
});
$wake($future);
}
}
}
class Executor {
private $queue ;
public function __construct()
{
$this->queue = new SplQueue();
}
private function enqueue($future)
{
if ($future instanceof Closure) {
$future = new Future($future);
}
$this->queue->enqueue($future);
}
public function go(callable $task)
{
$this->enqueue($task);
}
/**
* Run the executor until all futures are resolved
*/
public function run()
{
$wake = $this->enqueue(...);
while (!$this->queue->isEmpty()) {
$future = $this->queue->dequeue();
$future->pull($wake);
}
}
}
function waitAll(...$futures)
{
return new WaitAllFuture(...$futures);
}
// -----------Userland code------------
function async_main()
{
echo "Start async main\n";
yield waitAll(
task("task1"),
task("task2"),
task("task3"),
);
echo "All tasks are done\n";
}
function task($name)
{
foreach (range(1, 10) as $i) {
$filename = "file$i.txt";
// invoke nested async function
// use yield to wait for the result, like the await keyword in other languages.
$content = yield ioTask($filename);
echo "{$name}: content {$filename}: {$content}\n";
}
}
function sysCall($act)
{
yield "sysCall $act\n";
}
function ioTask($filename)
{
// echo "reading $filename\n";
// call nested async function, use yield to wait for the result
yield sysCall("init syscall $filename");
return "$filename content";
}
$executor = new Executor();
$executor->go(async_main(...));
$executor->run();
Start async main
task1: content file1.txt: file1.txt content
task2: content file1.txt: file1.txt content
task3: content file1.txt: file1.txt content
task1: content file2.txt: file2.txt content
task2: content file2.txt: file2.txt content
task3: content file2.txt: file2.txt content
task1: content file3.txt: file3.txt content
task2: content file3.txt: file3.txt content
task3: content file3.txt: file3.txt content
task1: content file4.txt: file4.txt content
task2: content file4.txt: file4.txt content
task3: content file4.txt: file4.txt content
task1: content file5.txt: file5.txt content
task2: content file5.txt: file5.txt content
task3: content file5.txt: file5.txt content
task1: content file6.txt: file6.txt content
task2: content file6.txt: file6.txt content
task3: content file6.txt: file6.txt content
task1: content file7.txt: file7.txt content
task2: content file7.txt: file7.txt content
task3: content file7.txt: file7.txt content
task1: content file8.txt: file8.txt content
task2: content file8.txt: file8.txt content
task3: content file8.txt: file8.txt content
task1: content file9.txt: file9.txt content
task2: content file9.txt: file9.txt content
task3: content file9.txt: file9.txt content
task1: content file10.txt: file10.txt content
task2: content file10.txt: file10.txt content
task3: content file10.txt: file10.txt content
All tasks are done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment