Skip to content

Instantly share code, notes, and snippets.

@vinyvicente
Forked from fbrnc/consumer.php
Created February 13, 2023 11:25
Show Gist options
  • Save vinyvicente/8b222a57817cb6bc18c9d9f315757053 to your computer and use it in GitHub Desktop.
Save vinyvicente/8b222a57817cb6bc18c9d9f315757053 to your computer and use it in GitHub Desktop.
AWS Kinesis Example for PHP (using the AWS SDK for PHP)
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$numberOfRecordsPerBatch = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');
$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
echo "ShardId: $shardId\n";
// get initial shard iterator
$res = $kinesisClient->getShardIterator([
'ShardId' => $shardId,
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST'
// 'StartingSequenceNumber' => '<string>',
'StreamName' => $streamName,
]);
$shardIterator = $res->get('ShardIterator');
do {
echo "Get Records\n";
$res = $kinesisClient->getRecords([
'Limit' => $numberOfRecordsPerBatch,
'ShardIterator' => $shardIterator
]);
$shardIterator = $res->get('NextShardIterator');
$localCount = 0;
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
list($sequenceNumber, $item) = $data;
echo "- [$sequenceNumber] $item\n";
$count++;
$localCount++;
}
echo "Processed $localCount records in this batch\n";
sleep(1);
} while ($localCount>0);
}
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $count;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
<?php
// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php
// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$totalNumberOfRecords = 10000;
require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);
/**
* Simple buffer that batches messages before passing them to a callback
*/
class Buffer {
protected $callback;
protected $size;
protected $data = [];
public function __construct($callback, $size=500) {
$this->callback = $callback;
$this->size = $size;
}
public function add($item) {
$this->data[] = $item;
if (count($this->data) >= $this->size) {
$this->flush();
}
}
public function reset() {
$this->data = [];
}
public function flush() {
if (count($this->data) > 0) {
call_user_func($this->callback, $this->data);
$this->reset();
}
}
}
$buffer = new Buffer(function(array $data) use ($kinesisClient, $streamName) {
echo "Flushing\n";
$parameter = [ 'StreamName' => $streamName, 'Records' => []];
foreach ($data as $item) {
$parameter['Records'][] = [
'Data' => $item,
'PartitionKey' => md5($item)
];
}
$res = $kinesisClient->putRecords($parameter);
echo "Failed records: {$res->get('FailedRecordCount')}\n";
});
$startTime = microtime(true);
for ($i=0; $i<$totalNumberOfRecords; $i++) {
$buffer->add(json_encode([
'id' => rand(0, 10000),
'title' => 'Foo'
]));
}
$buffer->flush();
$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $totalNumberOfRecords;
echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment