Skip to content

Instantly share code, notes, and snippets.

@chespinoza
Last active April 22, 2021 17:11
Show Gist options
  • Save chespinoza/c4856d6a59b512ef0726fbcccbd3dcb2 to your computer and use it in GitHub Desktop.
Save chespinoza/c4856d6a59b512ef0726fbcccbd3dcb2 to your computer and use it in GitHub Desktop.
import time
from authlib.integrations.requests_client import OAuth2Session
import signer
NA_AUTH_URI = "https://api.amazon.com/auth/o2/token"
EU_AUTH_URI = "https://api.amazon.co.uk/auth/o2/token"
FE_AUTH_URI = "https://api.amazon.co.jp/auth/o2/token"
ONE_HOUR = 3600
TOKEN_URL = NA_AUTH_URI
SESSION_SCOPE = None
REFRESH_TOKEN = "dfdfd"
CLIENT_ID = ""
CLIENT_SECRET = ""
AWS_ACCESS_KEY_ID = ""
# As this token is expired authlib will automagically try and refresh it
expired_token = {
"token_type": "bearer",
"access_token": "",
"refresh_token": REFRESH_TOKEN,
"expires_at": int(time.time()) - ONE_HOUR,
}
# The client can only handle refreshing the token if the token_endpoint is set
client = OAuth2Session(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token=expired_token,
token_endpoint=TOKEN_URL,
scope=SESSION_SCOPE
)
# Prepare request
# Sign request
params = signer.RequestParams(
method="GET",
service="",
host="",
region="",
endpoint="",
parameters="",
access_token=client.token
)
signed_url, headers = signer.sign(aws_access_key="", aws_secret_key="", request_params=params)
result = client.get(signed_url, headers=headers)
# Ref: http://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html
import hashlib
import hmac
from dataclasses import dataclass
from datetime import datetime
ALGORITHM = "AWS4-HMAC-SHA256"
class Headers(dict):
@property
def keys_as_str(self):
keys = []
for key in self:
keys.append(f"{key};")
if keys:
return "".join(keys)[:-1]
return ""
def __str__(self):
items = []
for key, val in self.items():
items.append(f"{key}:{val}\n")
if items:
return "".join(items)[:-1]
return ""
@dataclass(frozen=True)
class RequestParams:
method: str
service: str
host: str
region: str
endpoint: str
parameters: str
access_token: str
payload: str = ""
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def _get_signature_key(
key: str, datestamp: str, region_name: str, service_name: str
) -> bytes:
k_date = _sign(("AWS4" + key).encode("utf-8"), datestamp)
k_region = _sign(k_date, region_name)
k_service = _sign(k_region, service_name)
k_signing = _sign(k_service, "aws4_request")
return k_signing
def sign(
aws_access_key: str,
aws_secret_key: str,
request_params: RequestParams,
payload: str = "",
) -> (str, Headers):
# Create a date for headers and the credential string
now = datetime.utcnow()
amzdate = now.strftime("%Y%m%dT%H%M%SZ")
datestamp = now.strftime("%Y%m%d") # Date w/o time, used in credential scope
headers = Headers(host=request_params.host)
headers["user-agent"] = "Nozzle test app/0.1 (Language=Python/3; Platform=Linux)"
headers["x-amz-access-token"] = request_params.access_token
headers["x-amz-date"] = datetime.utcnow().strftime("%Y%m%dT%H%M%S.%fZ")
signed_headers = headers.keys_as_str
canonical_headers = str(headers)
payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (
f"{request_params.method}\n"
"/\n"
f"{request_params.parameters}\n"
f"{canonical_headers}\n"
f"{signed_headers}\n"
f"{payload_hash}"
)
credential_scope = (
f"{datestamp}/{request_params.region}/{request_params.service}/aws4_request"
)
string_to_sign = (
f"{ALGORITHM}\n{amzdate}\n{credential_scope}\n"
+ hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
)
# Create the signing key
signing_key = _get_signature_key(
aws_secret_key, datestamp, request_params.region, request_params.service
)
# Sign the string_to_sign using the signing_key
signature = hmac.new(
signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
).hexdigest()
authorization_header = f"{ALGORITHM} Credential={aws_access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}"
headers["Authorization"] = authorization_header
return f"{request_params.endpoint}?{request_params.parameters}", headers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment