Created
September 15, 2014 12:18
-
-
Save ngyuki/8a1c57b4c2c4a2f97701 to your computer and use it in GitHub Desktop.
PHP のコルーチンを使ったなんかよくわからないもの
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| namespace ngyuki\Example\AsyncReceive; | |
| class Await | |
| { | |
| private $has = false; | |
| private $val = false; | |
| public function reset() | |
| { | |
| $this->has = false; | |
| $this->val = false; | |
| return $this; | |
| } | |
| public function set($val = true) | |
| { | |
| $this->has = true; | |
| $this->val = $val; | |
| return $this; | |
| } | |
| public function has() | |
| { | |
| return $this->has; | |
| } | |
| public function get() | |
| { | |
| return $this->val; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| namespace ngyuki\Example\AsyncReceive; | |
| class Loop | |
| { | |
| /** | |
| * @var \Generator[] | |
| */ | |
| private $tasks = array(); | |
| /** | |
| * @param $func callable | |
| */ | |
| public function add(callable $func) | |
| { | |
| $task = $func(); | |
| assert('$task instanceof \Generator'); | |
| $this->tasks[spl_object_hash($task)] = $task; | |
| } | |
| /** | |
| * @param $task callable | |
| */ | |
| public function run(callable $task = null) | |
| { | |
| if ($task) { | |
| $this->add($task); | |
| } | |
| while (count($this->tasks)) { | |
| foreach ($this->tasks as $key => $task) { | |
| $data = null; | |
| $await = $task->current(); | |
| if ($await instanceof Await) { | |
| if ($await->has() == false) { | |
| continue; | |
| } | |
| $data = $await->get(); | |
| } | |
| $task->send($data); | |
| if ($task->valid() == false) { | |
| unset($this->tasks[$key]); | |
| } | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| namespace ngyuki\Example\AsyncReceive; | |
| /* | |
| 1. Open three terminal | |
| 2. Run the following command each | |
| - `while :; do sleep 1; LANG=C date; done | nc -lk 7777` | |
| - `while :; do sleep 2; LANG=C date; done | nc -lk 8888` | |
| - `while :; do sleep 3; LANG=C date; done | nc -lk 9999` | |
| 3. Open another terminal | |
| 4. Run the following command | |
| - `php sample.php` | |
| */ | |
| require_once __DIR__ . '/Loop.php'; | |
| require_once __DIR__ . '/Await.php'; | |
| function color($no, $str) | |
| { | |
| return "\x1B[0;3{$no}m{$str}\x1B[m"; | |
| } | |
| $loop = new Loop(); | |
| $list = array(); | |
| $task = function ($port, $color) use (&$list) { | |
| return function () use (&$list, $port, $color) { | |
| $await = new Await(); | |
| $stream = fsockopen("localhost", $port); | |
| if ($stream === false) { | |
| echo color(1, "unable open port $port\n"); | |
| return; | |
| } | |
| try { | |
| $list[(int)$stream] = array($stream, $await); | |
| for (;;) { | |
| $buf = (yield $await->reset()); | |
| if ($buf === false) { | |
| echo color(1, "unable read port $port\n"); | |
| break; | |
| } | |
| $len = strlen($buf); | |
| if ($len === 0) { | |
| break; | |
| } | |
| echo color($color, $buf); | |
| } | |
| } finally { | |
| unset($list[(int)$stream]); | |
| fclose($stream); | |
| } | |
| }; | |
| }; | |
| $loop->add($task(7777, 2)); | |
| $loop->add($task(8888, 3)); | |
| $loop->add($task(9999, 4)); | |
| $loop->add(function () use (&$list) { | |
| while (count($list)) { | |
| $r = $w = $e = array(); | |
| foreach ($list as $id => list($stream)) { | |
| $r[$id] = $stream; | |
| } | |
| if (stream_select($r, $w, $e, null) === false) { | |
| throw new \RuntimeException("Unable select stream"); | |
| } | |
| foreach ($r as $id => $stream) { | |
| /* @var $await Await */ | |
| list (, $await) = $list[$id]; | |
| $buf = fread($stream, 8192); | |
| $await->set($buf); | |
| } | |
| yield; | |
| } | |
| }); | |
| $loop->run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment