Skip to content

Instantly share code, notes, and snippets.

@dchekmarev
Last active October 20, 2016 04:20
Show Gist options
  • Save dchekmarev/dd4514b34320f41df30c966c04dacb21 to your computer and use it in GitHub Desktop.
Save dchekmarev/dd4514b34320f41df30c966c04dacb21 to your computer and use it in GitHub Desktop.
<?php
interface Closeable
{
public function close();
}
interface Producer
{
public function produce();
}
interface Consumer extends Closeable
{
public function consume($product);
}
class Processor
{
private $producer;
private $consumer;
public function __construct(Producer $producer, Consumer $consumer)
{
$this->producer = $producer;
$this->consumer = $consumer;
}
public function run()
{
while (null !== ($product = $this->producer->produce())) {
$this->consumer->consume($product);
}
$this->consumer->close();
}
}
class ChunkedConsumer implements Consumer
{
private $consumerProducer;
private $chunkSize;
private $buffer = [];
public function __construct(Producer $consumerProducer, $chunkSize)
{
$this->consumerProducer = $consumerProducer;
$this->chunkSize = $chunkSize;
}
public function consume($product)
{
$this->buffer = array_merge($this->buffer, $product);
while (count($this->buffer) > $this->chunkSize) {
$this->pushChunk();
}
}
private function pushChunk()
{
$chunk = array_slice($this->buffer, 0, $this->chunkSize);
$subConsumer = $this->consumerProducer->produce();
$subConsumer->consume($chunk);
$subConsumer->close();
$this->buffer = array_slice($this->buffer, $this->chunkSize);
}
public function close()
{
while (!empty($this->buffer)) {
$this->pushChunk();
}
}
}
interface Converter
{
public function convert($product);
}
class ConvertingConsumer implements Consumer
{
private $subConsumer;
private $converter;
public function __construct(Consumer $subConsumer, Converter $converter)
{
$this->subConsumer = $subConsumer;
$this->converter = $converter;
}
public function consume($product)
{
$this->subConsumer->consume($this->converter->convert($product));
}
public function close()
{
$this->subConsumer->close();
}
}
class ProducerStub implements Producer
{
private $idx = 0;
private $totalRecords;
private $batchSize;
public function __construct($total, $batch)
{
$this->totalRecords = $total;
$this->batchSize = $batch;
}
public function produce()
{
if ($this->idx > $this->totalRecords) {
return null;
}
$res = [];
for ($i = 0; $i < $this->batchSize; $i++) {
$res[] = $this->idx++;
}
echo "produced items: [" . implode(", ", $res) . "]\n";
return $res;
}
}
class ConsumerProducer implements Producer
{
private $sitemapIndex = 1;
private $consumerClass;
public function __construct($consumerClass)
{
$this->consumerClass = $consumerClass;
}
public function produce()
{
return new $this->consumerClass($this->sitemapIndex++);
}
}
class ConsumerStub implements Consumer
{
private $filename;
private $partFiles = [];
public function __construct($filename)
{
$this->filename = $filename;
}
public function consume($product)
{
echo "consuming part for " . $this->filename . ": ";
echo "items: [" . implode(", ", $product) . "]\n";
}
public function close()
{
echo "closing " . $this->filename . "\n";
}
}
class XmlConverter implements Converter
{
public function convert($product)
{
return [
'<xml>' . implode('', array_map(function($x) { return '<item>' . $x . '</item>'; }, $product)) . '</xml>'
];
}
}
class JSONConverter implements Converter
{
public function convert($product)
{
return [
'{json: ' . implode(', ', $product) . '}'
];
}
}
class ConvertingConsumerProducer implements Producer
{
private $sitemapIndex = 1;
private $subConsumerProducer;
private $converter;
public function __construct($subConsumerProducer, $converter)
{
$this->subConsumerProducer = $subConsumerProducer;
$this->converter = $converter;
}
public function produce()
{
return new ConvertingConsumer($this->subConsumerProducer->produce(), $this->converter);
}
}
function test($total, $batch, $chunk) {
echo "testing, total = $total, batchSize = $batch, chunkSize = $chunk\n";
$dbBatch = new ProducerStub($total, $batch);
// $sitemapWriter = new ChunkedConsumer(new ConsumerStubProducer(), $chunk);
$sitemapWriter = new ChunkedConsumer(
new ConvertingConsumerProducer(new ConsumerProducer('ConsumerStub'), new XmlConverter()), $chunk);
// $sitemapWriter = new ChunkedConsumer(
// new ConvertingConsumerProducer(new ConsumerProducer('ConsumerStub'), new JSONConverter()), $chunk);
// $sitemapWriter = new ConvertingConsumer(new ConsumerStub('bigfile'), new XmlConverter());
$sitemap = new Processor($dbBatch, $sitemapWriter);
$sitemap->run();
echo "\n";
}
test(20, 5, 3);
test(20, 3, 5);
@christophe-ddproperty
Copy link

Thanks.

There are a few things interesting in this design (will probably be incorporated in a way or another into my upcoming PR):

  • Sub-producer concept
  • Names :)

However, you missed parts which are important to solve the problems, especially:

  • Multiple storage engine capabilities (we need both files and S3, and you cannot easily append content to resources stored on the latter)
  • Wrapping capabilities, to produce valid XML resources
  • Resource formatting and byte-level storage (we need to know the current chunk size in bytes to comply with Google limits)

My designs has such provisions, but I agree that's not the simplest thing I've seen.
Feel free to suggest ideas to overcome these issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment