Last active
May 27, 2018 02:53
-
-
Save andreif/7597c434fd93f28fb3fed02030e8e774 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import datetime | |
import hashlib | |
import hmac | |
import io | |
import json | |
import mimetypes | |
import os | |
import uuid | |
from urllib.error import HTTPError, URLError | |
from urllib.request import Request, urlopen | |
def guess_content_type(path): | |
return mimetypes.guess_type(path)[0] or 'application/octet-stream' | |
class FormData(object): | |
def __init__(self, data=None, files=None): | |
self.boundary = uuid.uuid4().hex | |
self._content = io.BytesIO() | |
for name, value in (data or {}).items(): | |
self.write_part(value=value, name=name) | |
for name, path in (files or {}).items(): | |
with open(path, 'rb') as fd: | |
value = fd.read() | |
self.write_part(value=value, name=name, | |
filename=os.path.basename(path), | |
content_type=guess_content_type(path)) | |
def write_part(self, value, content_type=None, **kwargs): | |
params = ''.join([f'; {k}="{v}"' for k, v in kwargs.items()]) | |
ct = f'\r\nContent-Type: {content_type}' if content_type else '' | |
self._content.write( | |
f'--{self.boundary}\r\n' | |
f'Content-Disposition: form-data{params}{ct}\r\n\r\n'.encode()) | |
if not isinstance(value, (str, bytes)): | |
value = str(value) | |
if isinstance(value, str): | |
value = value.encode() | |
self._content.write(value + b'\r\n') | |
@property | |
def content_type(self): | |
return f'multipart/form-data; boundary={self.boundary}' | |
@property | |
def body(self): | |
return self._content.getvalue() + \ | |
f'--{self.boundary}--\r\n'.encode() | |
class IAM(object): | |
def __init__(self, key, secret, token=None): | |
self.key = key | |
self.secret = secret | |
self.token = token | |
class S3Bucket(object): | |
def __init__(self, name, region, iam): | |
self.name = name | |
self.region = region | |
self.iam = iam | |
def upload(self, path, key, | |
acl='public-read', # 'private' | |
expires_in=60 * 5, | |
content_type=None, | |
cache_control=f'max-age={60 * 60 * 24 * 30}', | |
content_disposition='attachment', | |
encryption='AES256', | |
content_length_range=(1000, 20 * 1000 * 1000), # default: any | |
): | |
# The literal string '${filename}' is an S3 field variable for key. | |
# https://aws.amazon.com/articles/1434#aws-table | |
if key.endswith('/'): | |
key += '${filename}' | |
key = key.lstrip('/') | |
now = datetime.datetime.utcnow() | |
expires = now + datetime.timedelta(seconds=expires_in) | |
iso_now = now.strftime('%Y%m%dT%H%M%S000Z') | |
raw_date = now.strftime('%Y%m%d') | |
cred = f'{self.iam.key}/{raw_date}/{self.region}/s3/aws4_request' | |
algorithm = 'AWS4-HMAC-SHA256' | |
content_type = content_type or guess_content_type(path) | |
policy = { | |
'expiration': expires.strftime('%Y-%m-%dT%H:%M:%S.000Z'), | |
'conditions': [ | |
{'bucket': self.name}, | |
{'acl': acl}, | |
['starts-with', '$key', ''], | |
{'success_action_status': '201'}, | |
{'x-amz-credential': cred}, | |
{'x-amz-algorithm': algorithm}, | |
{'x-amz-date': iso_now}, | |
{'content-type': content_type}, | |
] | |
} | |
data = { | |
'success_action_status': 201, | |
'x-amz-credential': cred, | |
'x-amz-date': iso_now, | |
'x-amz-algorithm': algorithm, | |
'key': key, | |
'acl': acl, | |
'content-type': content_type, | |
} | |
if self.iam.token: | |
policy['conditions'].append( | |
{'x-amz-security-token': self.iam.token}) | |
data['x-amz-security-token'] = self.iam.token | |
if cache_control: | |
policy['conditions'].append({'Cache-Control': cache_control}) | |
data['Cache-Control'] = cache_control | |
if content_disposition: | |
policy['conditions'].append({ | |
'Content-Disposition': content_disposition | |
}) | |
data['Content-Disposition'] = content_disposition | |
with open(path, 'rb') as f: | |
md5 = base64.b64encode(hashlib.md5(f.read()).digest()) | |
policy['conditions'].append({ | |
'Content-MD5': md5.decode(), | |
}) | |
data['Content-MD5'] = md5 | |
if encryption: | |
policy['conditions'].append( | |
{'x-amz-server-side-encryption': encryption}) | |
data['x-amz-server-side-encryption'] = encryption | |
if content_length_range: | |
policy['conditions'].append( | |
['content-length-range'] + list(content_length_range)) | |
policy = base64.b64encode(json.dumps(policy).encode()) | |
date_key = self.hmac( | |
key='AWS4' + self.iam.secret, msg=raw_date).digest() | |
date_region_key = self.hmac( | |
key=date_key, msg=self.region).digest() | |
date_region_service_key = self.hmac( | |
key=date_region_key, msg='s3').digest() | |
signing_key = self.hmac( | |
key=date_region_service_key, msg='aws4_request').digest() | |
signature = self.hmac( | |
key=signing_key, msg=policy).hexdigest() | |
data['policy'] = policy | |
data['x-amz-signature'] = signature | |
data = FormData(data=data, files={'file': path}) | |
try: | |
with urlopen(Request( | |
url=self.url, | |
data=data.body, | |
headers={'Content-Type': data.content_type} | |
)) as res: | |
res.content = res.read() | |
return res | |
except HTTPError as e: | |
e.fp.content = e.fp.read() | |
return e.fp | |
except URLError as e: | |
print(e) | |
def hmac(self, key, msg): | |
if isinstance(key, str): | |
key = key.encode('utf-8') | |
if isinstance(msg, str): | |
msg = msg.encode('utf-8') | |
return hmac.new(key=key, msg=msg, digestmod=hashlib.sha256) | |
@property | |
def url(self): | |
return 'https://s3{0}.amazonaws.com/{1}'.format( | |
'' if not self.region or self.region == 'us-east-1' | |
else '-' + self.region, | |
self.name) | |
if __name__ == '__main__': | |
bucket = S3Bucket( | |
name=os.environ['S3_BUCKET'], | |
region=os.environ['S3_REGION'], | |
iam=IAM(key=os.environ['S3_KEY'], | |
secret=os.environ['S3_SECRET'])) | |
r = bucket.upload(path=__file__, key='/') | |
if r: | |
print(r) | |
print(r.status) | |
print(r.content.decode()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment