Skip to content

Instantly share code, notes, and snippets.

@ngyuki
Created September 15, 2014 12:18
Show Gist options
  • Save ngyuki/8a1c57b4c2c4a2f97701 to your computer and use it in GitHub Desktop.
Save ngyuki/8a1c57b4c2c4a2f97701 to your computer and use it in GitHub Desktop.
PHP のコルーチンを使ったなんかよくわからないもの
<?php
namespace ngyuki\Example\AsyncReceive;
class Await
{
private $has = false;
private $val = false;
public function reset()
{
$this->has = false;
$this->val = false;
return $this;
}
public function set($val = true)
{
$this->has = true;
$this->val = $val;
return $this;
}
public function has()
{
return $this->has;
}
public function get()
{
return $this->val;
}
}
<?php
namespace ngyuki\Example\AsyncReceive;
class Loop
{
/**
* @var \Generator[]
*/
private $tasks = array();
/**
* @param $func callable
*/
public function add(callable $func)
{
$task = $func();
assert('$task instanceof \Generator');
$this->tasks[spl_object_hash($task)] = $task;
}
/**
* @param $task callable
*/
public function run(callable $task = null)
{
if ($task) {
$this->add($task);
}
while (count($this->tasks)) {
foreach ($this->tasks as $key => $task) {
$data = null;
$await = $task->current();
if ($await instanceof Await) {
if ($await->has() == false) {
continue;
}
$data = $await->get();
}
$task->send($data);
if ($task->valid() == false) {
unset($this->tasks[$key]);
}
}
}
}
}
<?php
namespace ngyuki\Example\AsyncReceive;
/*
1. Open three terminal
2. Run the following command each
- `while :; do sleep 1; LANG=C date; done | nc -lk 7777`
- `while :; do sleep 2; LANG=C date; done | nc -lk 8888`
- `while :; do sleep 3; LANG=C date; done | nc -lk 9999`
3. Open another terminal
4. Run the following command
- `php sample.php`
*/
require_once __DIR__ . '/Loop.php';
require_once __DIR__ . '/Await.php';
function color($no, $str)
{
return "\x1B[0;3{$no}m{$str}\x1B[m";
}
$loop = new Loop();
$list = array();
$task = function ($port, $color) use (&$list) {
return function () use (&$list, $port, $color) {
$await = new Await();
$stream = fsockopen("localhost", $port);
if ($stream === false) {
echo color(1, "unable open port $port\n");
return;
}
try {
$list[(int)$stream] = array($stream, $await);
for (;;) {
$buf = (yield $await->reset());
if ($buf === false) {
echo color(1, "unable read port $port\n");
break;
}
$len = strlen($buf);
if ($len === 0) {
break;
}
echo color($color, $buf);
}
} finally {
unset($list[(int)$stream]);
fclose($stream);
}
};
};
$loop->add($task(7777, 2));
$loop->add($task(8888, 3));
$loop->add($task(9999, 4));
$loop->add(function () use (&$list) {
while (count($list)) {
$r = $w = $e = array();
foreach ($list as $id => list($stream)) {
$r[$id] = $stream;
}
if (stream_select($r, $w, $e, null) === false) {
throw new \RuntimeException("Unable select stream");
}
foreach ($r as $id => $stream) {
/* @var $await Await */
list (, $await) = $list[$id];
$buf = fread($stream, 8192);
$await->set($buf);
}
yield;
}
});
$loop->run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment