Created
March 6, 2016 16:20
-
-
Save SerafimArts/0c5317a0b2147b027b3b to your computer and use it in GitHub Desktop.
Reactor.php
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Serafim\Reactor; | |
/** | |
* Class FileReader | |
* @package Serafim\Reactor | |
*/ | |
class FileReader implements Reader | |
{ | |
/** | |
* @var string | |
*/ | |
protected $file; | |
/** | |
* FileReader constructor. | |
* @param $path | |
*/ | |
public function __construct($path) | |
{ | |
$this->file = $path; | |
} | |
/** | |
* @return \Generator | |
*/ | |
public function read() : \Generator | |
{ | |
$fp = fopen($this->file, 'r'); | |
try { | |
while (!feof($fp)) { | |
yield fread($fp, 1); | |
} | |
} finally { | |
fclose($fp); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* This file is part of Reactor package. | |
* | |
* @author Serafim <[email protected]> | |
* @date 06.03.2016 18:42 | |
* | |
* For the full copyright and license information, please view the LICENSE | |
* file that was distributed with this source code. | |
*/ | |
namespace Serafim\Reactor; | |
/** | |
* Class JoinedReader | |
* @package Serafim\Reactor | |
*/ | |
class JoinedReader implements \Iterator | |
{ | |
/** | |
* @var \Generator | |
*/ | |
protected $stream; | |
/** | |
* @var Reader | |
*/ | |
protected $reader; | |
/** | |
* @var array | |
*/ | |
protected $callbacks = []; | |
/** | |
* @var int | |
*/ | |
protected $timeout = 0; | |
/** | |
* JoinedReader constructor. | |
* @param Reader $reader | |
*/ | |
public function __construct(Reader $reader) | |
{ | |
$this->reader = $reader; | |
$this->stream = $reader->read(); | |
} | |
/** | |
* @param \Closure $callback | |
* @return $this | |
*/ | |
public function subscribe(\Closure $callback) | |
{ | |
$this->callbacks[] = $callback; | |
return $this; | |
} | |
/** | |
* @return mixed | |
*/ | |
public function current() | |
{ | |
$result = $this->stream->current(); | |
foreach ($this->callbacks as $callback) { | |
$callback($result, $this); | |
} | |
return $result; | |
} | |
/** | |
* @return int | |
*/ | |
public function getTimeout() | |
{ | |
return $this->timeout; | |
} | |
public function next() | |
{ | |
$tsBefore = microtime(true); | |
$this->stream->next(); | |
$this->timeout = microtime(true) - $tsBefore; | |
} | |
public function key() | |
{ | |
return $this->stream->key(); | |
} | |
public function valid() | |
{ | |
return $this->stream->valid(); | |
} | |
public function rewind() | |
{ | |
$this->stream->rewind(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Serafim\Reactor; | |
/** | |
* Class Reactor | |
* @package Serafim\Reactor | |
*/ | |
class Reactor | |
{ | |
/** | |
* @var array|\Iterator[] | |
*/ | |
protected $readers = []; | |
/** | |
* @param Reader $reader | |
* @return JoinedReader | |
*/ | |
public function join(Reader $reader) | |
{ | |
$joined = new JoinedReader($reader); | |
$this->readers[] = $joined; | |
return $joined; | |
} | |
/** | |
* @param Reader $reader | |
* @param \Closure $closure | |
* @return $this | |
*/ | |
public function subscribe(Reader $reader, \Closure $closure) | |
{ | |
return $this->join($reader)->subscribe($closure); | |
} | |
/** | |
* @return \Generator | |
*/ | |
public function start() | |
{ | |
while (count($this->readers) > 0) { | |
$delay = 0; | |
/** @var JoinedReader $item */ | |
foreach ($this->readers as $i => $item) { | |
if ($item->valid()) { | |
$item->current(); | |
$item->next(); | |
} else { | |
unset($this->readers[$i]); | |
} | |
$delay += $item->getTimeout(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace Serafim\Reactor; | |
/** | |
* Interface Reader | |
* @package Serafim\Reactor | |
*/ | |
interface Reader | |
{ | |
/** | |
* Reader constructor. | |
* @param $path | |
*/ | |
public function __construct($path); | |
/** | |
* @return \Generator | |
*/ | |
public function read() : \Generator; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment