Skip to content

Instantly share code, notes, and snippets.

@SerafimArts
Created March 6, 2016 16:20
Show Gist options
  • Save SerafimArts/0c5317a0b2147b027b3b to your computer and use it in GitHub Desktop.
Save SerafimArts/0c5317a0b2147b027b3b to your computer and use it in GitHub Desktop.
Reactor.php
<?php
namespace Serafim\Reactor;
/**
* Class FileReader
* @package Serafim\Reactor
*/
class FileReader implements Reader
{
/**
* @var string
*/
protected $file;
/**
* FileReader constructor.
* @param $path
*/
public function __construct($path)
{
$this->file = $path;
}
/**
* @return \Generator
*/
public function read() : \Generator
{
$fp = fopen($this->file, 'r');
try {
while (!feof($fp)) {
yield fread($fp, 1);
}
} finally {
fclose($fp);
}
}
}
<?php
/**
* This file is part of Reactor package.
*
* @author Serafim <[email protected]>
* @date 06.03.2016 18:42
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Serafim\Reactor;
/**
* Class JoinedReader
* @package Serafim\Reactor
*/
class JoinedReader implements \Iterator
{
/**
* @var \Generator
*/
protected $stream;
/**
* @var Reader
*/
protected $reader;
/**
* @var array
*/
protected $callbacks = [];
/**
* @var int
*/
protected $timeout = 0;
/**
* JoinedReader constructor.
* @param Reader $reader
*/
public function __construct(Reader $reader)
{
$this->reader = $reader;
$this->stream = $reader->read();
}
/**
* @param \Closure $callback
* @return $this
*/
public function subscribe(\Closure $callback)
{
$this->callbacks[] = $callback;
return $this;
}
/**
* @return mixed
*/
public function current()
{
$result = $this->stream->current();
foreach ($this->callbacks as $callback) {
$callback($result, $this);
}
return $result;
}
/**
* @return int
*/
public function getTimeout()
{
return $this->timeout;
}
public function next()
{
$tsBefore = microtime(true);
$this->stream->next();
$this->timeout = microtime(true) - $tsBefore;
}
public function key()
{
return $this->stream->key();
}
public function valid()
{
return $this->stream->valid();
}
public function rewind()
{
$this->stream->rewind();
}
}
<?php
namespace Serafim\Reactor;
/**
* Class Reactor
* @package Serafim\Reactor
*/
class Reactor
{
/**
* @var array|\Iterator[]
*/
protected $readers = [];
/**
* @param Reader $reader
* @return JoinedReader
*/
public function join(Reader $reader)
{
$joined = new JoinedReader($reader);
$this->readers[] = $joined;
return $joined;
}
/**
* @param Reader $reader
* @param \Closure $closure
* @return $this
*/
public function subscribe(Reader $reader, \Closure $closure)
{
return $this->join($reader)->subscribe($closure);
}
/**
* @return \Generator
*/
public function start()
{
while (count($this->readers) > 0) {
$delay = 0;
/** @var JoinedReader $item */
foreach ($this->readers as $i => $item) {
if ($item->valid()) {
$item->current();
$item->next();
} else {
unset($this->readers[$i]);
}
$delay += $item->getTimeout();
}
}
}
}
<?php
namespace Serafim\Reactor;
/**
* Interface Reader
* @package Serafim\Reactor
*/
interface Reader
{
/**
* Reader constructor.
* @param $path
*/
public function __construct($path);
/**
* @return \Generator
*/
public function read() : \Generator;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment