|
<?php |
|
|
|
namespace App; |
|
|
|
use Psr\Http\Message\ResponseInterface; |
|
use React\EventLoop\Loop; |
|
use React\EventLoop\TimerInterface; |
|
use React\Filesystem\AdapterInterface; |
|
use React\Http\Browser; |
|
use React\Stream\ReadableStreamInterface; |
|
use React\Stream\ThroughStream; |
|
|
|
class StreamedFile |
|
{ |
|
const END_STREAM_TOKEN = '---END---'; |
|
private AdapterInterface $filesystem; |
|
private Browser $browser; |
|
|
|
public function __construct( |
|
AdapterInterface $filesystem, |
|
Browser $browser |
|
) |
|
{ |
|
$this->filesystem = $filesystem; |
|
$this->browser = $browser; |
|
} |
|
|
|
|
|
public function get(string $url) : ReadableStreamInterface |
|
{ |
|
$socketFileName = '/tmp/file-middleware-' . rand(1, 999999999999999). '.sock'; |
|
touch($socketFileName); |
|
$fileForWriting = $this->filesystem->file($socketFileName); |
|
$fileForReading = $this->filesystem->file($socketFileName); |
|
$stream = new ThroughStream(); |
|
|
|
$this |
|
->browser |
|
->requestStreaming('GET', $url) |
|
->then(function(ResponseInterface $response) use ($fileForWriting, &$contentToRead) { |
|
$stream = $response->getBody(); |
|
$stream->on('data', function(string $data) use ($fileForWriting, &$contentToRead) { |
|
$contentToRead = $contentToRead + strlen($data); |
|
$fileForWriting->putContents($data, \FILE_APPEND); |
|
}); |
|
|
|
$stream->on('close', function() use ($fileForWriting, &$contentToRead) { |
|
$contentToRead = $contentToRead + strlen(self::END_STREAM_TOKEN); |
|
$fileForWriting->putContents(self::END_STREAM_TOKEN, \FILE_APPEND); |
|
}); |
|
}, function (\Throwable $e) { |
|
// echo 'Error: ' . $e->getMessage() . PHP_EOL; |
|
}); |
|
|
|
$offset = 0; |
|
Loop::addPeriodicTimer(.1, function (TimerInterface $timer) use ($fileForReading, &$offset, &$contentToRead, $stream, $socketFileName): void { |
|
if ($contentToRead === 0) { |
|
return; |
|
} |
|
|
|
// echo '[Con] ' . $contentToRead . PHP_EOL; |
|
$fileForReading->getContents($offset, 8192)->then(function (string $contents) use (&$offset, $timer, &$contentToRead, $stream, $socketFileName): void { |
|
// echo '[Dat] ' . strlen($contents) . PHP_EOL; |
|
// echo '[Off] ' . $offset . PHP_EOL; |
|
|
|
// That emulates 1ms blocking work (for transformations, for example) |
|
$stream->write($contents); |
|
$contentToRead = $contentToRead - strlen($contents); |
|
$offset = $offset + strlen($contents); |
|
$isLast = str_ends_with($contents, self::END_STREAM_TOKEN); |
|
|
|
if ($isLast) { |
|
var_dump('LAST'); |
|
Loop::cancelTimer($timer); |
|
$stream->end(); |
|
$stream->close(); |
|
unlink($socketFileName); |
|
} |
|
}); |
|
}); |
|
|
|
return $stream; |
|
} |
|
} |