Skip to content

Instantly share code, notes, and snippets.

@x011
Created April 19, 2019 03:11
Show Gist options
  • Save x011/2e881130474b39b1fac1add4e9d89698 to your computer and use it in GitHub Desktop.
Save x011/2e881130474b39b1fac1add4e9d89698 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import base64
import datetime
import hashlib
import hmac
import simplejson as json
import requests
# Key derivation functions
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
import sys
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
if __name__ == '__main__':
# Read credentials from the environment
access_key ='xxx'
secret_key = 'xxx'
# Uncomment this line if you use temporary credentials via STS or similar
#token = os.environ.get('AWS_SESSION_TOKEN')
if access_key is None or secret_key is None:
print('No access key is available.')
sys.exit()
# This code shows the v4 request signing process as shown in
# http://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
host = 'rekognition.us-east-1.amazonaws.com'
endpoint = 'https://rekognition.us-east-1.amazonaws.com'
service = 'rekognition'
# Currently, all Rekognition actions require POST requests
method = 'POST'
region = 'us-east-1'
# This defines the service target and sub-service you want to hit
# In this case you want to use 'CompareFaces'
amz_target = 'RekognitionService.CompareFaces'
# Amazon content type - Rekognition expects 1.1 x-amz-json
content_type = 'application/x-amz-json-1.1'
# Create a date for headers and the credential string
now = datetime.datetime.utcnow()
amz_date = now.strftime('%Y%m%dT%H%M%SZ')
date_stamp = now.strftime('%Y%m%d') # Date w/o time, used in credential scope
# Canonical request information
canonical_uri = '/'
canonical_querystring = ''
canonical_headers = 'content-type:' + content_type + '\n' + 'host:' + host + '\n' + 'x-amz-date:' + amz_date + '\n' + 'x-amz-target:' + amz_target + '\n'
# list of signed headers
signed_headers = 'content-type;host;x-amz-date;x-amz-target'
# Our source image: http://i.imgur.com/OK8aDRq.jpg
'''with open('img/2019-04-18_030303.jpg', 'rb') as source_image:
source_bytes = base64.b64encode(source_image.read())
# Our target image: http://i.imgur.com/Xchqm1r.jpg
with open('2019-04-17_234321.jpg', 'rb') as target_image:
target_bytes = base64.b64encode(target_image.read())'''
# here we build the dictionary for our request data
# that we will convert to JSON
p1 = requests.get("https://millercenter.org/sites/default/files/styles/gallery_slider_desktop_2x/public/ig-obama-portrait-800x480.jpg", stream=True)
p2 = requests.get("https://news.wttw.com/sites/default/files/field/image/Obama%20cropped_3.jpg", stream=True)
#base64.b85encode(image_bytes).decode('utf-8'),
request_dict = {
"SimilarityThreshold": 2,
"SourceImage": {
#"Bytes": json.dumps(base64.b64encode(p1.content.decode("utf-8"))),
#"Bytes": base64.b85encode(BytesIO(p1.content).read()).decode('utf-8'),
#"Bytes": base64.b64encode(BytesIO(p1.content).read()),
"Bytes": base64.b64encode(p1.raw.read()),
#"Bytes": source_bytes,
},
"TargetImage": {
#"Bytes": target_bytes,
#"Bytes": base64.b64encode(BytesIO(p2.content).read()),
"Bytes": base64.b64encode(p2.raw.read()),
}
}
'''# that we will convert to JSON
request_dict = {
'SimilarityThreshold': 75.0,
'SourceImage': {
'Bytes': source_bytes
},
'TargetImage': {
'Bytes': target_bytes
}
}'''
# Convert our dict to a JSON string as it will be used as our payload
request_parameters = json.dumps(request_dict)
print(request_parameters)
# Generate a hash of our payload for verification by Rekognition
payload_hash = hashlib.sha256(request_parameters.encode("utf8")).hexdigest()
# All of this is
canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = algorithm + '\n' + amz_date + '\n' + credential_scope + '\n' + hashlib.sha256(canonical_request.encode("utf8")).hexdigest()
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'), hashlib.sha256).hexdigest()
authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
headers = { 'Content-Type': content_type,
'X-Amz-Date': amz_date,
'X-Amz-Target': amz_target,
# uncomment this if you uncommented the 'token' line earlier
#'X-Amz-Security-Token': token,
'Authorization': authorization_header}
r = requests.post(endpoint, data=request_parameters, headers=headers)
# Let's format the JSON string returned from the API for better output
formatted_text = json.dumps(json.loads(r.text), indent=4, sort_keys=True)
print('Response code: {}\n'.format(r.status_code))
print('Response body:\n{}'.format(formatted_text))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment