Skip to content

Instantly share code, notes, and snippets.

@jsatt
Created March 6, 2016 03:23
Show Gist options
  • Save jsatt/088a02b1a4b09b854069 to your computer and use it in GitHub Desktop.
Save jsatt/088a02b1a4b09b854069 to your computer and use it in GitHub Desktop.
AWS Signing v4
### example calls
def post_key(path, **kwargs):
'''
post_key('testing/', data=json.dumps({"blah": 123}))
'''
endpoint = '{}{}'.format(ENDPOINT, path)
req = requests.Request('POST', endpoint, **kwargs)
sign_request(req)
return make_request(req)
def get_key(path, **kwargs):
'''
get_key('testing/')
'''
endpoint = '{}{}'.format(ENDPOINT, path)
req = requests.Request('GET', endpoint, **kwargs)
sign_request(req)
return make_request(req)
import logging
import base64
import datetime
import hashlib
import hmac
import json
import os
import sys
import urllib2
import requests
logger = logging.getLogger(__name__)
# Put these values in your environment
ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY_ID')
SECRET_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
# ENDPOINT = 'https://search-city-struggle-bus-t75er6uguw6axw7hjhrd2vfrhm.us-west-2.es.amazonaws.com/'
ENDPOINT = os.environ.get('ES_ENDPOINT')
# REGION = 'us-west-2'
REGION = os.environ.get('AWS_REGION')
# Keep these like this for elastic search.
SERVICE = 'es'
ALGORITHM = 'AWS4-HMAC-SHA256'
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
# Derived from examples at
# https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
def sign_request(request):
t = datetime.datetime.utcnow()
amzdate = t.strftime('%Y%m%dT%H%M%SZ')
datestamp = t.strftime('%Y%m%d')
signed_headers = 'host;x-amz-date'
# task 1
canonical_request = get_canonical_request(request, amzdate, signed_headers)
# task 2
credential_scope = datestamp + '/' + REGION + '/' + SERVICE + '/' + 'aws4_request'
string_to_sign = ALGORITHM + '\n' + amzdate + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request).hexdigest()
# task 3
signing_key = getSignatureKey(SECRET_KEY, datestamp, REGION, SERVICE)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
# task 4
authorization_header = ALGORITHM + ' ' + 'Credential=' + ACCESS_KEY + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
headers = {'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Date': amzdate,
'Authorization': authorization_header}
request.headers.update(headers)
def get_canonical_request(request, amzdate, signed_headers):
url_parts = urllib2.urlparse.urlparse(request.url)
canonical_uri = url_parts.path
canonical_querystring = '&'.join( # urlencode querystring
'='.join(map(urllib2.quote, q.split('=')))
for q in url_parts.query.split('&')
)
canonical_headers = 'host:' + url_parts.hostname + '\n' + 'x-amz-date:' + amzdate + '\n'
if request.method == 'GET':
payload = ''
else:
payload = request.data
payload_hash = hashlib.sha256(payload).hexdigest()
canonical_request = request.method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
return canonical_request
def make_request(req):
session = requests.Session()
prep = req.prepare()
resp = session.send(prep)
logger.debug(resp)
return resp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment