-
-
Save vinyvicente/8b222a57817cb6bc18c9d9f315757053 to your computer and use it in GitHub Desktop.
AWS Kinesis Example for PHP (using the AWS SDK for PHP)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// curl -sS https://getcomposer.org/installer | php | |
// php composer.phar require aws/aws-sdk-php | |
// export AWS_ACCESS_KEY_ID=... | |
// export AWS_SECRET_ACCESS_KEY=... | |
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>'; | |
$numberOfRecordsPerBatch = 10000; | |
require_once 'vendor/autoload.php'; | |
$sdk = new \Aws\Sdk(); | |
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']); | |
// get all shard ids | |
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]); | |
$shardIds = $res->search('StreamDescription.Shards[].ShardId'); | |
$count = 0; | |
$startTime = microtime(true); | |
foreach ($shardIds as $shardId) { | |
echo "ShardId: $shardId\n"; | |
// get initial shard iterator | |
$res = $kinesisClient->getShardIterator([ | |
'ShardId' => $shardId, | |
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST' | |
// 'StartingSequenceNumber' => '<string>', | |
'StreamName' => $streamName, | |
]); | |
$shardIterator = $res->get('ShardIterator'); | |
do { | |
echo "Get Records\n"; | |
$res = $kinesisClient->getRecords([ | |
'Limit' => $numberOfRecordsPerBatch, | |
'ShardIterator' => $shardIterator | |
]); | |
$shardIterator = $res->get('NextShardIterator'); | |
$localCount = 0; | |
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) { | |
list($sequenceNumber, $item) = $data; | |
echo "- [$sequenceNumber] $item\n"; | |
$count++; | |
$localCount++; | |
} | |
echo "Processed $localCount records in this batch\n"; | |
sleep(1); | |
} while ($localCount>0); | |
} | |
$duration = microtime(true) - $startTime; | |
$timePerMessage = $duration*1000 / $count; | |
echo "Total Duration: " . round($duration) . " seconds\n"; | |
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n"; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// curl -sS https://getcomposer.org/installer | php | |
// php composer.phar require aws/aws-sdk-php | |
// export AWS_ACCESS_KEY_ID=... | |
// export AWS_SECRET_ACCESS_KEY=... | |
$streamName = '<INSERT_YOUR_STREAMNAME_HERE>'; | |
$totalNumberOfRecords = 10000; | |
require_once 'vendor/autoload.php'; | |
$sdk = new \Aws\Sdk(); | |
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']); | |
/** | |
* Simple buffer that batches messages before passing them to a callback | |
*/ | |
class Buffer { | |
protected $callback; | |
protected $size; | |
protected $data = []; | |
public function __construct($callback, $size=500) { | |
$this->callback = $callback; | |
$this->size = $size; | |
} | |
public function add($item) { | |
$this->data[] = $item; | |
if (count($this->data) >= $this->size) { | |
$this->flush(); | |
} | |
} | |
public function reset() { | |
$this->data = []; | |
} | |
public function flush() { | |
if (count($this->data) > 0) { | |
call_user_func($this->callback, $this->data); | |
$this->reset(); | |
} | |
} | |
} | |
$buffer = new Buffer(function(array $data) use ($kinesisClient, $streamName) { | |
echo "Flushing\n"; | |
$parameter = [ 'StreamName' => $streamName, 'Records' => []]; | |
foreach ($data as $item) { | |
$parameter['Records'][] = [ | |
'Data' => $item, | |
'PartitionKey' => md5($item) | |
]; | |
} | |
$res = $kinesisClient->putRecords($parameter); | |
echo "Failed records: {$res->get('FailedRecordCount')}\n"; | |
}); | |
$startTime = microtime(true); | |
for ($i=0; $i<$totalNumberOfRecords; $i++) { | |
$buffer->add(json_encode([ | |
'id' => rand(0, 10000), | |
'title' => 'Foo' | |
])); | |
} | |
$buffer->flush(); | |
$duration = microtime(true) - $startTime; | |
$timePerMessage = $duration*1000 / $totalNumberOfRecords; | |
echo "Total Duration: " . round($duration) . " seconds\n"; | |
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n"; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment