Skip to content

Instantly share code, notes, and snippets.

@vladaman
Created October 14, 2022 07:20
Show Gist options
  • Save vladaman/07391e7de0cc73daf20da67b433ec77c to your computer and use it in GitHub Desktop.
Save vladaman/07391e7de0cc73daf20da67b433ec77c to your computer and use it in GitHub Desktop.
Data to Amazon Kinesis from PHP without AWS library dependency
<?php
/*
POST / HTTP/1.1
Host: kinesis.<region>.<domain>
x-amz-Date: <Date>
Authorization: AWS4-HMAC-SHA256 Credential=<Credential>, SignedHeaders=content-type;date;host;user-agent;x-amz-date;x-amz-target;x-amzn-requestid, Signature=<Signature>
User-Agent: <UserAgentString>
Content-Type: application/x-amz-json-1.1
Content-Length: <PayloadSizeBytes>
Connection: Keep-Alive
X-Amz-Target: Kinesis_20130901.PutRecord
// PutRecord
{
"StreamName": "exampleStreamName",
"Data": "XzxkYXRhPl8x",
"PartitionKey": "partitionKey"
}
// PutRecords
"Records": [
{
"Data": blob,
"ExplicitHashKey": "string",
"PartitionKey": "string"
}
],
*/
/**
*
*/
class AmazonKinesis {
static $utc_tz;
/** @var string $access_key */
private $ACCESS_KEY;
/** @var string $secret_key */
private $SECRET_KEY;
private $REGION = 'eu-west-1';
public function __construct($access_key, $secret_key, $region = 'eu-west-1') {
$this->ACCESS_KEY = $access_key;
$this->SECRET_KEY = $secret_key;
if(empty($this->ACCESS_KEY) || empty($this->SECRET_KEY)) {
// throw new InvalidArgumentException('Must define Amazon access key and secret key');
}
$this->REGION = $region;
}
function kinesis( $streamName, $records ){
if( !self::$utc_tz )
self::$utc_tz = new \DateTimeZone( 'UTC' );
$APIVERSION = '20131202';
$datestamp = new \DateTime( "now", self::$utc_tz );
$longdate = $datestamp->format( "Ymd\\THis\\Z");
$shortdate = $datestamp->format( "Ymd" );
$out = array();
foreach($records as $record){
$arr = array(
"Data" => base64_encode(json_encode($record)),
"PartitionKey" => "part",
);
array_push($out, $arr);
}
$payload = json_encode(array( "Records" => $records, "StreamName" => $streamName ));
// print $payload;
// establish the signing key
{
$ksecret = 'AWS4' . $this->SECRET_KEY;
$kdate = hash_hmac( 'sha256', $shortdate, $ksecret, true );
$kregion = hash_hmac( 'sha256', $this->REGION, $kdate, true );
$kservice = hash_hmac( 'sha256', 'kinesis', $kregion, true );
$ksigning = hash_hmac( 'sha256', 'aws4_request', $kservice, true );
}
// command parameters
$params = array(
'host' => 'kinesis.' + $this->REGION + '.amazonaws.com',
'content-type' => 'application/x-amz-json-1.1',
'x-amz-date' => $longdate,
'x-amz-target' => 'Kinesis_'. $APIVERSION . '.PutRecords',
'content-length' => strlen( $payload ),
'user-agent' => 'PHPKinesisLib',
'connection' => 'keep-alive',
);
$canonical_request = $this->createCanonicalRequest( $params, $payload );
$signed_request = hash( 'sha256', $canonical_request );
$sign_string = "AWS4-HMAC-SHA256\n{$longdate}\n$shortdate/" . $this->REGION . "/kinesis/aws4_request\n" . $signed_request;
$signature = hash_hmac( 'sha256', $sign_string, $ksigning );
$params['authorization'] = "AWS4-HMAC-SHA256 Credential=" . $this->ACCESS_KEY . "/$shortdate/" . $this->REGION . "/kinesis/aws4_request, " .
"SignedHeaders=" . implode( ";", array_keys( $params ) ) . ", " .
"Signature=$signature";
/*
* Execute Crafted Request
*/
$url = "https://kinesis." . $this->REGION . ".amazonaws.com";
$ch = curl_init();
$curl_headers = array();
foreach( $params as $p => $k )
$curl_headers[] = $p . ": " . $k;
curl_setopt($ch, CURLOPT_URL,$url);
curl_setopt($ch, CURLOPT_POST,1);
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_HTTPHEADER, $curl_headers);
curl_setopt($ch, CURLOPT_RETURNTRANSFER,1);
curl_setopt($ch, CURLOPT_TCP_NODELAY, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false );
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false );
// debug opts
{
curl_setopt($ch, CURLOPT_VERBOSE, true);
$verbose = fopen('php://temp', 'rw+');
curl_setopt($ch, CURLOPT_STDERR, $verbose);
$result = curl_exec($ch); // raw result
rewind($verbose);
$verboseLog = stream_get_contents($verbose);
}
$rescode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
if ($rescode == '200'){
return true;
}else{
throw new KinesisException("Invalid server response " . $rescode . $result);
}
}
private function createCanonicalRequest( Array $params, $payload )
{
$canonical_request = array();
$canonical_request[] = 'POST';
$canonical_request[] = '/';
$canonical_request[] = '';
$can_headers = array(
'host' => 'kinesis.' . $this->REGION . '.amazonaws.com'
);
foreach( $params as $k => $v )
$can_headers[ strtolower( $k ) ] = trim( $v );
uksort( $can_headers, 'strcmp' );
foreach ( $can_headers as $k => $v )
$canonical_request[] = $k . ':' . $v;
$canonical_request[] = '';
$canonical_request[] = implode( ';', array_keys( $can_headers ) );
$canonical_request[] = hash( 'sha256', $payload );
$canonical_request = implode( "\n", $canonical_request );
return $canonical_request;
}
}
class KinesisException extends Exception {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment