Last active
April 22, 2021 17:11
-
-
Save chespinoza/c4856d6a59b512ef0726fbcccbd3dcb2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from authlib.integrations.requests_client import OAuth2Session | |
import signer | |
NA_AUTH_URI = "https://api.amazon.com/auth/o2/token" | |
EU_AUTH_URI = "https://api.amazon.co.uk/auth/o2/token" | |
FE_AUTH_URI = "https://api.amazon.co.jp/auth/o2/token" | |
ONE_HOUR = 3600 | |
TOKEN_URL = NA_AUTH_URI | |
SESSION_SCOPE = None | |
REFRESH_TOKEN = "dfdfd" | |
CLIENT_ID = "" | |
CLIENT_SECRET = "" | |
AWS_ACCESS_KEY_ID = "" | |
# As this token is expired authlib will automagically try and refresh it | |
expired_token = { | |
"token_type": "bearer", | |
"access_token": "", | |
"refresh_token": REFRESH_TOKEN, | |
"expires_at": int(time.time()) - ONE_HOUR, | |
} | |
# The client can only handle refreshing the token if the token_endpoint is set | |
client = OAuth2Session( | |
client_id=CLIENT_ID, | |
client_secret=CLIENT_SECRET, | |
token=expired_token, | |
token_endpoint=TOKEN_URL, | |
scope=SESSION_SCOPE | |
) | |
# Prepare request | |
# Sign request | |
params = signer.RequestParams( | |
method="GET", | |
service="", | |
host="", | |
region="", | |
endpoint="", | |
parameters="", | |
access_token=client.token | |
) | |
signed_url, headers = signer.sign(aws_access_key="", aws_secret_key="", request_params=params) | |
result = client.get(signed_url, headers=headers) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Ref: http://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html | |
import hashlib | |
import hmac | |
from dataclasses import dataclass | |
from datetime import datetime | |
ALGORITHM = "AWS4-HMAC-SHA256" | |
class Headers(dict): | |
@property | |
def keys_as_str(self): | |
keys = [] | |
for key in self: | |
keys.append(f"{key};") | |
if keys: | |
return "".join(keys)[:-1] | |
return "" | |
def __str__(self): | |
items = [] | |
for key, val in self.items(): | |
items.append(f"{key}:{val}\n") | |
if items: | |
return "".join(items)[:-1] | |
return "" | |
@dataclass(frozen=True) | |
class RequestParams: | |
method: str | |
service: str | |
host: str | |
region: str | |
endpoint: str | |
parameters: str | |
access_token: str | |
payload: str = "" | |
# Key derivation functions. See: | |
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python | |
def _sign(key: bytes, msg: str) -> bytes: | |
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
def _get_signature_key( | |
key: str, datestamp: str, region_name: str, service_name: str | |
) -> bytes: | |
k_date = _sign(("AWS4" + key).encode("utf-8"), datestamp) | |
k_region = _sign(k_date, region_name) | |
k_service = _sign(k_region, service_name) | |
k_signing = _sign(k_service, "aws4_request") | |
return k_signing | |
def sign( | |
aws_access_key: str, | |
aws_secret_key: str, | |
request_params: RequestParams, | |
payload: str = "", | |
) -> (str, Headers): | |
# Create a date for headers and the credential string | |
now = datetime.utcnow() | |
amzdate = now.strftime("%Y%m%dT%H%M%SZ") | |
datestamp = now.strftime("%Y%m%d") # Date w/o time, used in credential scope | |
headers = Headers(host=request_params.host) | |
headers["user-agent"] = "Nozzle test app/0.1 (Language=Python/3; Platform=Linux)" | |
headers["x-amz-access-token"] = request_params.access_token | |
headers["x-amz-date"] = datetime.utcnow().strftime("%Y%m%dT%H%M%S.%fZ") | |
signed_headers = headers.keys_as_str | |
canonical_headers = str(headers) | |
payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
canonical_request = ( | |
f"{request_params.method}\n" | |
"/\n" | |
f"{request_params.parameters}\n" | |
f"{canonical_headers}\n" | |
f"{signed_headers}\n" | |
f"{payload_hash}" | |
) | |
credential_scope = ( | |
f"{datestamp}/{request_params.region}/{request_params.service}/aws4_request" | |
) | |
string_to_sign = ( | |
f"{ALGORITHM}\n{amzdate}\n{credential_scope}\n" | |
+ hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() | |
) | |
# Create the signing key | |
signing_key = _get_signature_key( | |
aws_secret_key, datestamp, request_params.region, request_params.service | |
) | |
# Sign the string_to_sign using the signing_key | |
signature = hmac.new( | |
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 | |
).hexdigest() | |
authorization_header = f"{ALGORITHM} Credential={aws_access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}" | |
headers["Authorization"] = authorization_header | |
return f"{request_params.endpoint}?{request_params.parameters}", headers |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment