similar to https://github.com/aws-samples/sigv4-signing-examples but the varaint below invokes sts.GetCallerIdentity
where the AWS_SECRET_ACCESS_KEY
is embedded in a TPM
In other words, this sample will seal the AWS_SECRET_ACCESS_KEY
inside a TPM and then use the TPM to create an AWS v4 signature
at no time does the secret leave the TPM but it can be made to issue an hmac
for more info, see
- https://github.com/salrashid123/tpm2/tree/master/hmac_import
- https://github.com/salrashid123/aws_hmac
- https://github.com/salrashid123/aws-tpm-process-credential
- https://github.com/aws-samples/sigv4-signing-examples
- for tpm roles anywhere https://github.com/salrashid123/aws_rolesanywhere_signer
mkdir /tmp/myvtpm
sudo swtpm_setup --tpmstate /tmp/myvtpm --tpm2 --create-ek-cert
sudo swtpm socket --tpmstate dir=/tmp/myvtpm --tpm2 --server type=tcp,port=2321 --ctrl type=tcp,port=2322 --flags not-need-init,startup-clear
### in new window
export TPM2TOOLS_TCTI="swtpm:port=2321"
tpm2_pcrread sha256:23
then export some env vars and run the script
export AWS_ACCESS_KEY_ID=redacted
export AWS_SECRET_ACCESS_KEY=redacted
python3 main_tpm.py
this script will embed the AWS secret into the TPM and then issue signed headers which will be used to call the STS GetCallerIdentity endpoint. Each run of this script will embed the key but you can certainly persist the embedded key and reuse it for later invocations. That part is left out of this sample.
main_tpm.py
:
import datetime
import hashlib
import hmac
import requests
import os
import sys
from tpm2_pytss import *
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
# AWS access keys
access_key = os.environ['AWS_ACCESS_KEY_ID']
secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
ectx = ESAPI(tcti="swtpm:port=2321")
ectx.startup(TPM2_SU.CLEAR)
k = 'AWS4'+secret_key
inPublic = TPM2B_PUBLIC(
TPMT_PUBLIC.parse(
alg="rsa2048",
objectAttributes=TPMA_OBJECT.USERWITHAUTH
| TPMA_OBJECT.RESTRICTED
| TPMA_OBJECT.DECRYPT
| TPMA_OBJECT.FIXEDTPM
| TPMA_OBJECT.FIXEDPARENT
| TPMA_OBJECT.SENSITIVEDATAORIGIN,
)
)
inPublicHMAC = TPM2B_PUBLIC(
TPMT_PUBLIC.parse(
alg="hmac",
nameAlg="sha256",
objectAttributes=TPMA_OBJECT.USERWITHAUTH
| TPMA_OBJECT.FIXEDPARENT
| TPMA_OBJECT.FIXEDTPM
| TPMA_OBJECT.SIGN_ENCRYPT,
)
)
inSensitive = TPM2B_SENSITIVE_CREATE()
primary1, _, _, _, _ = ectx.create_primary(inSensitive, inPublic)
inSensitiveHMAC = TPM2B_SENSITIVE_CREATE(TPMS_SENSITIVE_CREATE(data=TPM2B_SENSITIVE_DATA(k.encode("utf-8"))))
priv, pub, _, _, _ = ectx.create(primary1, inSensitiveHMAC, inPublicHMAC)
## if you want, you can write the pub/priv to disk (eg pub.marshal())
# pub2, _ = TPM2B_PUBLIC.unmarshal(pub.marshal())
# priv2, _ = TPM2B_PRIVATE.unmarshal(priv.marshal())
childHandle = ectx.load(primary1, priv, pub)
ectx.flush_context(primary1)
# hmac = ectx.hmac(childHandle, b"1234", TPM2_ALG.SHA256)
# print(hmac.marshal().hex())
# ectx.flush_context(childHandle)
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
# Request parameters
method = 'POST'
service = 'sts'
host = "sts.amazonaws.com"
region = 'us-east-1'
endpoint = '/'
# Create a datetime object for signing
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
# Create the canonical request
canonical_uri = endpoint
canonical_querystring = ''
canonical_headers = 'content-type:application/x-www-form-urlencoded' + '\n' +'host:' + host + '\n' + 'x-amz-date:' + amzdate + '\n'
signed_headers = 'content-type;host;x-amz-date'
# https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
payload = 'Action=GetCallerIdentity&Version=2011-06-15'
#payload = 'Action=GetSessionToken&DurationSeconds=3600&Version=2011-06-15'
#payload = 'Action=AssumeRole&&RoleSessionName=testAR&RoleArn=arn:aws:iam::291738886548:role/gcpsts&Version=2011-06-15'
payload_hash = hashlib.sha256(payload.encode('utf-8')).hexdigest()
canonical_request = (method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n'
+ canonical_headers + '\n' + signed_headers + '\n' + payload_hash)
# Create the string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' +
hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
## instead of the first hmac operation using the AWS Secret, use the TPM based key
#kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
thmac = ectx.hmac(childHandle, dateStamp, TPM2_ALG.SHA256)
ectx.flush_context(childHandle)
kDate = thmac.__bytes__()
ectx.close()
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, "aws4_request")
return kSigning
# Sign the string
signing_key = getSignatureKey(secret_key, datestamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
# Add signing information to the request
authorization_header = (algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +
'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature)
print(authorization_header)
# Make the request
headers = {'content-type': 'application/x-www-form-urlencoded',
'host': host,
'x-amz-date': amzdate,
'Authorization': authorization_header}
request_url = 'https://' + host + canonical_uri
response = requests.post(request_url, headers=headers, data=payload,allow_redirects=False, timeout=5)
response.raise_for_status()
print(response.text)