Skip to content

Instantly share code, notes, and snippets.

@trowski
Last active July 14, 2017 03:03
Show Gist options
  • Save trowski/0985a54358a890b2090176420160af16 to your computer and use it in GitHub Desktop.
Save trowski/0985a54358a890b2090176420160af16 to your computer and use it in GitHub Desktop.
<?php
class Broadcaster {
/** @var \Amp\Emitter[] */
private $emitters;
/** @var bool */
private $complete = false;
/** @var \Throwable|null */
private $exception;
public function __construct(Amp\Iterator $iterator) {
Amp\Promise\rethrow(new Amp\Coroutine($this->iterate($iterator)));
}
private function iterate(Amp\Iterator $iterator): \Generator {
try {
while (yield $iterator->advance()) {
$promises = [];
$current = $iterator->getCurrent();
foreach ($this->emitters as $emitter) {
$promises[] = $emitter->emit($current);
}
yield Amp\Promise\all($promises);
}
} catch (Throwable $exception) {
$this->exception = $exception;
}
$emitters = $this->emitters;
$this->emitters = [];
$this->complete = true;
if ($this->exception) {
foreach ($emitters as $emitter) {
$emitter->fail($this->exception);
}
} else {
foreach ($emitters as $emitter) {
$emitter->complete();
}
}
}
public function getIterator(): Amp\Iterator {
$emitter = new Amp\Emitter;
$iterator = $emitter->iterate();
if ($this->complete) {
if ($this->exception) {
$emitter->fail($this->exception);
} else {
$emitter->complete();
}
} else {
$this->emitters[\spl_object_hash($iterator)] = $emitter;
}
return $iterator;
}
public function removeIterator(Amp\Iterator $iterator) {
$hash = \spl_object_hash($iterator);
if (!isset($this->emitters[$hash])) {
return;
}
$emitter = $this->emitters[$hash];
unset($this->emitters[$hash]);
$emitter->complete();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment