Created
December 15, 2017 17:08
-
-
Save anapsix/7be3cc5dca634ccde15b503eee06fcd3 to your computer and use it in GitHub Desktop.
Kinesis Consumer in PHP with AWS SDK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
// if running in Alpine, install the following | |
// apk -U add php7 php7-mbstring php7-simplexml php7-json php7-phar php7-openssl curl | |
// curl -sS https://getcomposer.org/installer | php | |
// php composer.phar require aws/aws-sdk-php | |
// | |
// export AWS_ACCESS_KEY_ID=... | |
// export AWS_SECRET_ACCESS_KEY=... | |
if (getenv('KINESIS_STREAM')) { | |
$streamName = getenv('KINESIS_STREAM'); | |
} else { | |
$streamName = 'my-kinesis-stream'; | |
} | |
$numberOfRecordsPerBatch = 10; | |
//require_once 'vendor/autoload.php'; | |
echo "Getting messages from " . $streamName . " in batches of " . $numberOfRecordsPerBatch . "\n"; | |
use Aws\Credentials\CredentialProvider; | |
use Aws\Kinesis\KinesisClient; | |
use Aws\Sts\StsClient; | |
// see provider docs | |
// http://docs.aws.amazon.com/aws-sdk-php/v3/api/class-Aws.Credentials.CredentialProvider.html | |
$provider = CredentialProvider::memoize(CredentialProvider::chain( | |
CredentialProvider::ini(), | |
CredentialProvider::instanceProfile(), | |
CredentialProvider::assumeRole([ | |
'client' => new StsClient(['region' => 'us-east-1', 'version' => 'latest']), | |
'assume_role_params' => [ | |
'RoleArn' => 'arn:aws:iam::555555555555:role/Admin', | |
'RoleSessionName' => 'test_session', | |
] | |
]) | |
)); | |
//this works | |
// $kinesisClient = KinesisClient::factory(array( | |
// 'credentials' => $provider, | |
// 'version' => 'latest', | |
// 'region' => 'us-east-2' | |
// )); | |
// see doc for all the connection options | |
// https://github.com/aws/aws-sdk-php/blob/master/docs/guide/configuration.rst | |
// this also works | |
$kinesisClient = new KinesisClient([ | |
'region' => 'us-east-2', | |
'version' => 'latest', | |
'credentials' => $provider, | |
//'debug' => true, | |
'retries' => 10, | |
'delay' => 1000, | |
'synchronous' => true, | |
'http' => [ | |
'timeout' => 5, | |
'connect_timeout' => 5, | |
'verify' => false | |
] | |
]); | |
// and so does this | |
// $sdk = new \Aws\Sdk(); | |
// $kinesisClient = $sdk->createKinesis( | |
// [ | |
// // 'region' => 'us-east-1', | |
// // 'version' => 'latest', | |
// 'credentials' => $provider | |
// ] | |
// ); | |
// get all shard ids | |
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]); | |
$shardIds = $res->search('StreamDescription.Shards[].ShardId'); | |
$count = 0; | |
$startTime = microtime(true); | |
foreach ($shardIds as $shardId) { | |
echo "ShardId: $shardId\n"; | |
// get initial shard iterator | |
$res = $kinesisClient->getShardIterator([ | |
'ShardId' => $shardId, | |
'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST' | |
// 'StartingSequenceNumber' => '<string>', | |
'StreamName' => $streamName, | |
]); | |
$shardIterator = $res->get('ShardIterator'); | |
do { | |
echo "Get Records\n"; | |
$res = $kinesisClient->getRecords([ | |
'Limit' => $numberOfRecordsPerBatch, | |
'ShardIterator' => $shardIterator | |
]); | |
$shardIterator = $res->get('NextShardIterator'); | |
$behind = $res->get('MillisBehindLatest'); | |
$localCount = 0; | |
foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) { | |
list($sequenceNumber, $item) = $data; | |
echo "- [$sequenceNumber]\n"; | |
$count++; | |
$localCount++; | |
} | |
echo "Processed $localCount records in this batch\n"; | |
//sleep(1); | |
} while ($behind>0 && $shardIterator != ''); | |
} | |
$duration = microtime(true) - $startTime; | |
$timePerMessage = $duration*1000 / $count; | |
echo "Total Messages: " . $count . "\n"; | |
echo "Total Duration: " . round($duration) . " seconds\n"; | |
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n"; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment