Last active
June 5, 2024 17:14
-
-
Save dbyr/8c9c04c63ac78da7eef57a7a3fc4ccc0 to your computer and use it in GitHub Desktop.
A demonstration of a request to AWS's DynamoDB (in this case, a query) that does not use boto3. In benchmark tests this script outperforms boto3 in regards to time to complete a query.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, os, base64, datetime, hashlib, hmac | |
import requests | |
import json | |
from typing import * | |
# based on the example python code found here: | |
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html | |
# call this method to query all items for a given primary hash key in a table | |
# endpoint: the DynamoDB endpoint to query (for AWS servers, this is "https://dynamodb.{region}.amazonaws.com/") | |
# region: the AWS region in which the table resides | |
# table_name: the name of the table to query | |
# client_name: the value of the primary hash key from which to query items | |
# hash_key: the name of the primary hash key | |
# hash_key_type: the type of the primary hash key (the default "S" is "string") | |
def query( | |
endpoint, region, table_name, client_name, hash_key, hash_key_type="S", | |
): | |
# collect required authorisation information from the environment | |
access_key = os.environ.get("AWS_ACCESS_KEY_ID") | |
secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") | |
session_token = os.environ.get("AWS_SESSION_TOKEN") | |
more = True | |
start_key = None | |
results = [] | |
session = requests.session() | |
# continue to send requests until the query is complete | |
while more: | |
parameters = create_body(table_name, client_name, hash_key, start_key) | |
response = session.post( | |
endpoint, | |
data=parameters, | |
headers=create_headers( | |
region, | |
parameters, | |
access_key=access_key, | |
secret_key=secret_key, | |
session_token=session_token, | |
), | |
) | |
# if the query is finished, this key will not be included in the response | |
data = response.json() | |
try: | |
start_key = data["LastEvaluatedKey"] | |
except KeyError: | |
more = False | |
try: | |
results += data["Items"] | |
except KeyError: | |
if response.status_code == 400: | |
raise Exception( | |
"Failed to resolve query. It's likely your AWS session token has expired." | |
) | |
else: | |
raise Exception( | |
"Failed to resolve query: received status code " | |
+ response.status_code | |
) | |
return [ | |
{key: deserialize(value, float) for key, value in item.items()} | |
for item in results | |
] | |
def create_body( | |
table_name, client_name, hash_key, start_key=None, hash_key_type="S", | |
): | |
params_dict = { | |
"TableName": table_name, | |
"KeyConditionExpression": "#n0 = :v0", | |
"ExpressionAttributeNames": {"#n0": hash_key}, | |
"ExpressionAttributeValues": {":v0": {hash_key_type: client_name}}, | |
} | |
if start_key: | |
params_dict["ExclusiveStartKey"] = start_key | |
request_parameters = json.dumps(params_dict) | |
return request_parameters | |
def create_headers( | |
region, request_parameters, access_key=None, secret_key=None, session_token=None | |
): | |
if access_key is None or secret_key is None: | |
print("No access key is available.") | |
sys.exit() | |
service = "dynamodb" | |
host = f"dynamodb.{region}.amazonaws.com" | |
content_type = "application/x-amz-json-1.0" | |
amz_target = "DynamoDB_20120810.Query" | |
method = "POST" | |
# Create a date for headers and the credential string | |
t = datetime.datetime.utcnow() | |
amz_date = t.strftime("%Y%m%dT%H%M%SZ") | |
date_stamp = t.strftime("%Y%m%d") | |
canonical_uri = "/" | |
canonical_querystring = "" | |
canonical_headers = f"content-type:{content_type}\nhost:{host}\nx-amz-date:{amz_date}\nx-amz-target:{amz_target}\n" | |
signed_headers = "content-type;host;x-amz-date;x-amz-target" | |
payload_hash = hashlib.sha256(request_parameters.encode("utf-8")).hexdigest() | |
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers}\n{payload_hash}" | |
algorithm = "AWS4-HMAC-SHA256" | |
credential_scope = f"{date_stamp}/{region}/{service}/aws4_request" | |
request_digest = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() | |
string_to_sign = f"{algorithm}\n{amz_date}\n{credential_scope}\n{request_digest}" | |
signing_key = getSignatureKey(secret_key, date_stamp, region, service) | |
signature = hmac.new( | |
signing_key, (string_to_sign).encode("utf-8"), hashlib.sha256 | |
).hexdigest() | |
authorization_header = f"{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}" | |
# the security token header needs to be included for sessions | |
headers = { | |
"Content-Type": content_type, | |
"X-Amz-Date": amz_date, | |
"X-Amz-Target": amz_target, | |
"Authorization": authorization_header, | |
"X-Amz-Security-Token": session_token, | |
} | |
return headers | |
def sign(key, msg): | |
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
def getSignatureKey(key, date_stamp, regionName, serviceName): | |
kDate = sign(("AWS4" + key).encode("utf-8"), date_stamp) | |
kRegion = sign(kDate, regionName) | |
kService = sign(kRegion, serviceName) | |
kSigning = sign(kService, "aws4_request") | |
return kSigning | |
SIMPLE_TYPES = frozenset({"BOOL", "S", "B"}) | |
SIMPLE_SET_TYPES = frozenset({"SS", "BS"}) | |
NULL_TYPE = "NULL" | |
def deserialize(value: Dict[str, Any], numeric_type: Callable[[str], Any]) -> Any: | |
if not value: | |
raise TypeError( | |
"Value must be a nonempty dictionary whose key " "is a valid dynamodb type." | |
) | |
tag, val = next(iter(value.items())) | |
if tag in SIMPLE_TYPES: | |
return val | |
if tag == NULL_TYPE: | |
return None | |
if tag == "N": | |
return numeric_type(val) | |
if tag in SIMPLE_SET_TYPES: | |
return set(val) | |
if tag == "NS": | |
return {numeric_type(v) for v in val} | |
if tag == "L": | |
return [deserialize(v, numeric_type) for v in val] | |
if tag == "M": | |
return {k: deserialize(v, numeric_type) for k, v in val.items()} | |
raise TypeError(f"Dynamodb type {tag} is not supported") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment