Last active
February 17, 2023 22:38
-
-
Save Ragnoroct/940dfc3146bb3ee4887cb29841bb2d68 to your computer and use it in GitHub Desktop.
aws signed requests v4 using stdlib
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
MIT License | |
Copyright (c) 2023 Will Bender | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
""" | |
from urllib.parse import urlparse, quote | |
from re import search as re_search | |
from hashlib import new as hashlib_new | |
from hmac import new as hmac_new | |
from datetime import datetime as datetimeclass | |
QUERYSTR_SAFE = ( | |
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "_.-~" | |
) | |
hash_func = lambda msg: hashlib_new(name="sha256", data=msg) | |
hmac_func = lambda key, msg: hmac_new(digestmod="sha256", key=key, msg=msg) | |
urllib_quote = lambda val: quote(val, "") # quote / to %2F | |
def aws_sign_headers_v4( | |
method, | |
url, | |
aws_access_key_id, | |
aws_secret_access_key, | |
aws_security_token="", | |
aws_service="", | |
aws_region="", | |
body=None, | |
optional_headers=None, | |
): | |
""" | |
Sign request to authenticate iam requests against api gateway | |
The result is a dict containing the signature to authenticate | |
AWS Signature Version 4 Documentation: | |
https://docs.aws.amazon.com/general/latest/gr/create-signed-request.html | |
References: | |
https://github.com/iksteen/aws-request-signer | |
https://github.com/DavidMuller/aws-requests-auth | |
https://stackoverflow.com/questions/39352648/access-aws-api-gateway-with-iam-roles-from-python/39357370#39357370 | |
""" | |
# prevent self footgun when you forget to post body when body is expected | |
if body is None and method.upper() in {"POST", "PUT", "PATCH"}: | |
raise Exception( | |
"body must explicitly be a string or an empty string for POST, PUT, PATCH" | |
) | |
# optional to sign headers other than x-amz and host | |
if optional_headers is None: | |
optional_headers = {} | |
# format: <gateway-id>.execute-api.<region>.amazonaws.com | |
url_match = re_search(r"[^.]+\.([^.]+)\.([^.]+)\.amazonaws.com", url) | |
if url_match is not None: | |
aws_service = aws_service or url_match[1] | |
aws_region = aws_region or url_match[2] | |
if not aws_service: | |
raise Exception("aws_service is missing value and unable reference from url") | |
if not aws_region: | |
raise Exception("aws_region is missing value and unable reference from url") | |
url_parsed = urlparse(url) | |
if aws_service != "execute-api": | |
raise Exception("currently only execute-api service is supported") | |
now = datetimeclass.utcnow() | |
aws_host = url_parsed.hostname | |
amz_algorithm = "AWS4-HMAC-SHA256" # signature version 4 | |
request_date = now.strftime("%Y%m%d") | |
request_date_time_z = now.strftime("%Y%m%dT%H%M%SZ") | |
credential_scope = f"{request_date}/{aws_region}/{aws_service}/aws4_request" | |
payload_hash = get_payload_hash(body) | |
headers_to_sign = optional_headers.copy() | |
headers_to_sign["host"] = aws_host | |
# step 1: create a canonical request | |
http_method = method.upper() | |
canonical_uri = get_canonical_uri(url_parsed.path) | |
canonical_query_str = get_canonical_query_str(url_parsed.query, urlpresign=False) | |
canonical_headers = get_canonical_headers(headers_to_sign) | |
signed_headers = get_signed_headers(headers_to_sign.keys()) | |
canonical_request = "\n".join( | |
[ | |
http_method, | |
canonical_uri, | |
canonical_query_str, | |
canonical_headers, | |
signed_headers, | |
payload_hash, | |
] | |
) | |
# step 2: create a hash of the canonical request | |
canonical_request_hash = ( | |
hash_func(canonical_request.encode("utf-8")).hexdigest().lower() | |
) | |
# step 3: create a string to sign | |
signature_components_str = "\n".join( | |
[ | |
amz_algorithm, | |
request_date_time_z, | |
credential_scope, | |
canonical_request_hash, | |
] | |
) | |
# step 4: calculate the signature | |
key_init = ("AWS4" + aws_secret_access_key).encode("utf-8") | |
k_date = hmac_func(key_init, now.strftime("%Y%m%d").encode("utf-8")).digest() | |
k_region = hmac_func(k_date, aws_region.encode("utf-8")).digest() | |
k_service = hmac_func(k_region, aws_service.encode("utf-8")).digest() | |
k_signing = hmac_func(k_service, b"aws4_request").digest() # signature version 4 | |
signature_hex_digest = ( | |
hmac_func(k_signing, signature_components_str.encode("utf-8")) | |
.hexdigest() | |
.lower() | |
) | |
authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format( | |
amz_algorithm, | |
aws_access_key_id, | |
credential_scope, | |
signed_headers, | |
signature_hex_digest, | |
) | |
extra_headers = { | |
"x-amz-content-sha256": payload_hash, | |
"x-amz-date": request_date_time_z, | |
} | |
if aws_security_token: | |
extra_headers["x-amz-security-token"] = aws_security_token | |
return {"authorization": authorization_header, **extra_headers} | |
def aws_sign_url_v4( | |
method, | |
url, | |
aws_access_key_id, | |
aws_secret_access_key, | |
expires_seconds=60, | |
aws_security_token="", | |
aws_service="", | |
aws_region="", | |
body="", | |
optional_headers=None, | |
): | |
""" | |
Sign url with credentials to authenticate iam requests against api gateway | |
The result is a presigned url | |
AWS Signature Version 4 Documentation: | |
https://docs.aws.amazon.com/general/latest/gr/create-signed-request.html | |
References: | |
https://github.com/iksteen/aws-request-signer | |
https://github.com/DavidMuller/aws-requests-auth | |
https://stackoverflow.com/questions/39352648/access-aws-api-gateway-with-iam-roles-from-python/39357370#39357370 | |
""" | |
# prevent self footgun when you forget to post body when body is expected | |
if body is None and method.upper() in {"POST", "PUT", "PATCH"}: | |
raise Exception( | |
"body must explicitly be a string or an empty string for POST, PUT, PATCH" | |
) | |
# optional to sign headers other than x-amz and host | |
if optional_headers is None: | |
optional_headers = {} | |
# format: <gateway-id>.execute-api.<region>.amazonaws.com | |
url_match = re_search(r"[^.]+\.([^.]+)\.([^.]+)\.amazonaws.com", url) | |
if url_match is not None: | |
aws_service = aws_service or url_match[1] | |
aws_region = aws_region or url_match[2] | |
if not aws_service: | |
raise Exception("aws_service is missing value and unable reference from url") | |
if not aws_region: | |
raise Exception("aws_region is missing value and unable reference from url") | |
url_parsed = urlparse(url) | |
if aws_service != "execute-api": | |
raise Exception("currently only execute-api service is supported") | |
now = datetimeclass.utcnow() | |
aws_host = url_parsed.hostname | |
amz_algorithm = "AWS4-HMAC-SHA256" # signature version 4 | |
request_date = now.strftime("%Y%m%d") | |
request_date_time_z = now.strftime("%Y%m%dT%H%M%SZ") | |
credential_scope = f"{request_date}/{aws_region}/{aws_service}/aws4_request" | |
payload_hash = get_payload_hash(body) | |
headers_to_sign = optional_headers.copy() | |
headers_to_sign["host"] = aws_host | |
# step 1: create a canonical request | |
http_method = method.upper() | |
canonical_uri = get_canonical_uri(url_parsed.path) | |
canonical_query_str = get_canonical_query_str( | |
url_parsed.query, | |
urlpresign=True, | |
presign_expires=expires_seconds, | |
signed_headers=headers_to_sign.keys(), | |
amz_algorithm=amz_algorithm, | |
payload_hash=payload_hash, | |
aws_access_key_id=aws_access_key_id, | |
credential_scope=credential_scope, | |
request_date_time_z=request_date_time_z, | |
aws_security_token=aws_security_token, | |
) | |
canonical_headers = get_canonical_headers(headers_to_sign) | |
signed_headers = get_signed_headers(headers_to_sign.keys()) | |
canonical_request = "\n".join( | |
[ | |
http_method, | |
canonical_uri, | |
canonical_query_str, | |
canonical_headers, | |
signed_headers, | |
payload_hash, | |
] | |
) | |
# step 2: create a hash of the canonical request | |
canonical_request_hash = ( | |
hash_func(canonical_request.encode("utf-8")).hexdigest().lower() | |
) | |
# step 3: create a string to sign | |
signature_components_str = "\n".join( | |
[ | |
amz_algorithm, | |
request_date_time_z, | |
credential_scope, | |
canonical_request_hash, | |
] | |
) | |
# step 4: calculate the signature | |
key_init = ("AWS4" + aws_secret_access_key).encode("utf-8") | |
k_date = hmac_func(key_init, now.strftime("%Y%m%d").encode("utf-8")).digest() | |
k_region = hmac_func(k_date, aws_region.encode("utf-8")).digest() | |
k_service = hmac_func(k_region, aws_service.encode("utf-8")).digest() | |
k_signing = hmac_func(k_service, b"aws4_request").digest() # signature version 4 | |
signature_hex_digest = ( | |
hmac_func(k_signing, signature_components_str.encode("utf-8")) | |
.hexdigest() | |
.lower() | |
) | |
# step 5: add the signature to the request | |
new_query = canonical_query_str | |
if not new_query.endswith("&"): | |
new_query += "&" | |
new_query += "&".join( | |
[ | |
"X-Amz-Signature={}".format(urllib_quote(signature_hex_digest)), | |
] | |
) | |
# noinspection PyProtectedMember | |
return url_parsed._replace(query=new_query).geturl() | |
def get_payload_hash(payload): | |
""" | |
A string created using the payload in the body of the HTTP request as input to a | |
hash function. This string uses lowercase hexadecimal characters. If the payload | |
is empty, use an empty string as the input to the hash function. | |
""" | |
if payload is None: | |
payload = "" | |
return hash_func(payload.encode("utf-8")).hexdigest().lower() | |
def get_signed_headers(signed_headers): | |
""" | |
The list of optional_headers that you included in CanonicalHeaders, separated by | |
semicolons (;). This indicates which optional_headers are part of the signing | |
process. Header names must use lowercase characters and must appear in alphabetical | |
order. | |
""" | |
return ";".join(sorted(signed_headers)) | |
def get_canonical_headers(headers_to_sign): | |
""" | |
The request optional_headers, that will be signed, and their values, separated by | |
newline characters. Header names must use lowercase characters, must appear in | |
alphabetical order, and must be followed by a colon (:). For the values, trim | |
any leading or trailing spaces, convert sequential spaces to a single space, | |
and separate the values for a multi-value header using commas. You must include | |
the host header (HTTP/1.1) or the :authority header (HTTP/2), and any x-amz-* | |
optional_headers in the signature. You can optionally include other standard | |
optional_headers in the signature, such as content-type. | |
undocumented: canonical_headers end with newline | |
""" | |
headers_list = [] | |
for header_name in headers_to_sign: | |
headers_list.append( | |
"{}:{}".format(header_name.lower(), headers_to_sign[header_name]) | |
) | |
headers_list.sort() | |
return "\n".join(headers_list) + "\n" | |
def get_canonical_uri(url_path): | |
""" | |
The URI-encoded version of the absolute path component URL (everything between | |
the host and the question mark character (?) that starts the query string | |
parameters). If the absolute path is empty, use a forward slash character (/). | |
""" | |
canonical_uri = quote(url_path) | |
if canonical_uri == "": | |
canonical_uri = "/" | |
return canonical_uri | |
def get_canonical_query_str( | |
query_str, | |
urlpresign=False, | |
presign_expires=60, | |
signed_headers=None, | |
amz_algorithm="", | |
payload_hash="", | |
aws_access_key_id="", | |
credential_scope="", | |
request_date_time_z="", | |
aws_security_token="", | |
): | |
""" | |
The URL-encoded query string parameters, separated by ampersands (&). | |
Percent-encode reserved characters, including the space character. Encode names | |
and values separately. If there are empty parameters, append the equals sign to | |
the parameter name before encoding. After encoding, sort the parameters | |
alphabetically by key name. If there is no query string, use an empty string | |
(""). | |
undocumented*: all the x-amz-* querystring params need to be included in canonical | |
query string. | |
notes: X-Amz-Security-Token should be included in the | |
canonical query string or appended after the signature is calculated | |
depending on the service. X-Amz-Signature is added after the signature | |
is calculated (makes sense). | |
""" | |
if urlpresign: | |
# add all amazon params to query string | |
query_str = query_str | |
if not query_str.endswith("&"): | |
query_str += "&" | |
query_str += "&".join( | |
[ | |
"X-Amz-Algorithm={}".format(urllib_quote(amz_algorithm)), | |
"X-Amz-Content-Sha256={}".format(urllib_quote(payload_hash)), | |
"X-Amz-Credential={}".format( | |
urllib_quote(aws_access_key_id + "/" + credential_scope) | |
), | |
"X-Amz-Date={}".format(urllib_quote(request_date_time_z)), | |
"X-Amz-Expires={}".format(urllib_quote(str(presign_expires))), | |
"X-Amz-SignedHeaders={}".format( | |
urllib_quote(";".join(sorted(signed_headers))) | |
), | |
] | |
) | |
if aws_security_token != "": | |
query_str += "&X-Amz-Security-Token={}".format( | |
urllib_quote(aws_security_token) | |
) | |
canonical_querystring = "" | |
querystring_sorted = "&".join(sorted(query_str.split("&"))) | |
for query_param in querystring_sorted.split("&"): | |
key_val_split = query_param.split("=", 1) | |
key = key_val_split[0] | |
if len(key_val_split) > 1: | |
val = key_val_split[1] | |
else: | |
val = "" | |
if not is_val_unquoted(val): | |
raise Exception( | |
f"query string is not quoted properly: query='{query_str}' val='{val}'" | |
) | |
if key: | |
if canonical_querystring: | |
canonical_querystring += "&" | |
canonical_querystring += "=".join([key, val]) | |
return canonical_querystring | |
def is_val_unquoted(val): | |
return not val.rstrip(QUERYSTR_SAFE + "/%") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment