Created
October 14, 2022 07:20
-
-
Save vladaman/07391e7de0cc73daf20da67b433ec77c to your computer and use it in GitHub Desktop.
Data to Amazon Kinesis from PHP without AWS library dependency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/* | |
POST / HTTP/1.1 | |
Host: kinesis.<region>.<domain> | |
x-amz-Date: <Date> | |
Authorization: AWS4-HMAC-SHA256 Credential=<Credential>, SignedHeaders=content-type;date;host;user-agent;x-amz-date;x-amz-target;x-amzn-requestid, Signature=<Signature> | |
User-Agent: <UserAgentString> | |
Content-Type: application/x-amz-json-1.1 | |
Content-Length: <PayloadSizeBytes> | |
Connection: Keep-Alive | |
X-Amz-Target: Kinesis_20130901.PutRecord | |
// PutRecord | |
{ | |
"StreamName": "exampleStreamName", | |
"Data": "XzxkYXRhPl8x", | |
"PartitionKey": "partitionKey" | |
} | |
// PutRecords | |
"Records": [ | |
{ | |
"Data": blob, | |
"ExplicitHashKey": "string", | |
"PartitionKey": "string" | |
} | |
], | |
*/ | |
/** | |
* | |
*/ | |
class AmazonKinesis { | |
static $utc_tz; | |
/** @var string $access_key */ | |
private $ACCESS_KEY; | |
/** @var string $secret_key */ | |
private $SECRET_KEY; | |
private $REGION = 'eu-west-1'; | |
public function __construct($access_key, $secret_key, $region = 'eu-west-1') { | |
$this->ACCESS_KEY = $access_key; | |
$this->SECRET_KEY = $secret_key; | |
if(empty($this->ACCESS_KEY) || empty($this->SECRET_KEY)) { | |
// throw new InvalidArgumentException('Must define Amazon access key and secret key'); | |
} | |
$this->REGION = $region; | |
} | |
function kinesis( $streamName, $records ){ | |
if( !self::$utc_tz ) | |
self::$utc_tz = new \DateTimeZone( 'UTC' ); | |
$APIVERSION = '20131202'; | |
$datestamp = new \DateTime( "now", self::$utc_tz ); | |
$longdate = $datestamp->format( "Ymd\\THis\\Z"); | |
$shortdate = $datestamp->format( "Ymd" ); | |
$out = array(); | |
foreach($records as $record){ | |
$arr = array( | |
"Data" => base64_encode(json_encode($record)), | |
"PartitionKey" => "part", | |
); | |
array_push($out, $arr); | |
} | |
$payload = json_encode(array( "Records" => $records, "StreamName" => $streamName )); | |
// print $payload; | |
// establish the signing key | |
{ | |
$ksecret = 'AWS4' . $this->SECRET_KEY; | |
$kdate = hash_hmac( 'sha256', $shortdate, $ksecret, true ); | |
$kregion = hash_hmac( 'sha256', $this->REGION, $kdate, true ); | |
$kservice = hash_hmac( 'sha256', 'kinesis', $kregion, true ); | |
$ksigning = hash_hmac( 'sha256', 'aws4_request', $kservice, true ); | |
} | |
// command parameters | |
$params = array( | |
'host' => 'kinesis.' + $this->REGION + '.amazonaws.com', | |
'content-type' => 'application/x-amz-json-1.1', | |
'x-amz-date' => $longdate, | |
'x-amz-target' => 'Kinesis_'. $APIVERSION . '.PutRecords', | |
'content-length' => strlen( $payload ), | |
'user-agent' => 'PHPKinesisLib', | |
'connection' => 'keep-alive', | |
); | |
$canonical_request = $this->createCanonicalRequest( $params, $payload ); | |
$signed_request = hash( 'sha256', $canonical_request ); | |
$sign_string = "AWS4-HMAC-SHA256\n{$longdate}\n$shortdate/" . $this->REGION . "/kinesis/aws4_request\n" . $signed_request; | |
$signature = hash_hmac( 'sha256', $sign_string, $ksigning ); | |
$params['authorization'] = "AWS4-HMAC-SHA256 Credential=" . $this->ACCESS_KEY . "/$shortdate/" . $this->REGION . "/kinesis/aws4_request, " . | |
"SignedHeaders=" . implode( ";", array_keys( $params ) ) . ", " . | |
"Signature=$signature"; | |
/* | |
* Execute Crafted Request | |
*/ | |
$url = "https://kinesis." . $this->REGION . ".amazonaws.com"; | |
$ch = curl_init(); | |
$curl_headers = array(); | |
foreach( $params as $p => $k ) | |
$curl_headers[] = $p . ": " . $k; | |
curl_setopt($ch, CURLOPT_URL,$url); | |
curl_setopt($ch, CURLOPT_POST,1); | |
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload); | |
curl_setopt($ch, CURLOPT_HTTPHEADER, $curl_headers); | |
curl_setopt($ch, CURLOPT_RETURNTRANSFER,1); | |
curl_setopt($ch, CURLOPT_TCP_NODELAY, true); | |
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false ); | |
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false ); | |
// debug opts | |
{ | |
curl_setopt($ch, CURLOPT_VERBOSE, true); | |
$verbose = fopen('php://temp', 'rw+'); | |
curl_setopt($ch, CURLOPT_STDERR, $verbose); | |
$result = curl_exec($ch); // raw result | |
rewind($verbose); | |
$verboseLog = stream_get_contents($verbose); | |
} | |
$rescode = curl_getinfo($ch, CURLINFO_HTTP_CODE); | |
if ($rescode == '200'){ | |
return true; | |
}else{ | |
throw new KinesisException("Invalid server response " . $rescode . $result); | |
} | |
} | |
private function createCanonicalRequest( Array $params, $payload ) | |
{ | |
$canonical_request = array(); | |
$canonical_request[] = 'POST'; | |
$canonical_request[] = '/'; | |
$canonical_request[] = ''; | |
$can_headers = array( | |
'host' => 'kinesis.' . $this->REGION . '.amazonaws.com' | |
); | |
foreach( $params as $k => $v ) | |
$can_headers[ strtolower( $k ) ] = trim( $v ); | |
uksort( $can_headers, 'strcmp' ); | |
foreach ( $can_headers as $k => $v ) | |
$canonical_request[] = $k . ':' . $v; | |
$canonical_request[] = ''; | |
$canonical_request[] = implode( ';', array_keys( $can_headers ) ); | |
$canonical_request[] = hash( 'sha256', $payload ); | |
$canonical_request = implode( "\n", $canonical_request ); | |
return $canonical_request; | |
} | |
} | |
class KinesisException extends Exception {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment