Last active
April 10, 2016 22:23
-
-
Save SeanHayes/6e7c6094f6a8265e8b56 to your computer and use it in GitHub Desktop.
Example of a Tastypie Resource which returns a signed upload URL for AWS S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
from datetime import timedelta | |
import hashlib | |
import hmac | |
import json | |
import mimetypes | |
import re | |
from django.conf import settings | |
from django.utils import timezone | |
from tastypie import fields, http, resources | |
from tastypie.authentication import ApiKeyAuthentication, MultiAuthentication,\ | |
SessionAuthentication | |
from tastypie.exceptions import BadRequest | |
# upload_storage should be an instance of S3BotoStorage from django-storages | |
from django.core.files.storage import default_storage | |
upload_storage = default_storage | |
class FakeModel(object): | |
pk = None | |
def __init__(self, **kwargs): | |
self.update(**kwargs) | |
def update(self, **kwargs): | |
self.__dict__.update(kwargs) | |
class UploadURLResource(resources.Resource): | |
EXTRA_ATTRIBUTE_BLACKLIST = frozenset([ | |
'acl', | |
'bucket', | |
'cache-control', | |
'content-disposition', | |
'content-encoding', | |
'content-length-range', | |
'content-type', | |
'expires', | |
'key', | |
'redirect', | |
'success_action_redirect', | |
'success_action_status', | |
'x-amz-security-token', | |
]) | |
extension = fields.CharField( | |
default='', | |
blank=True, | |
help_text="The extension to append to the generated key." | |
) | |
extra_form_attributes = fields.ListField( | |
null=True, | |
blank=True, | |
help_text="A list of strings specifying extra form fields that will get sent to S3." | |
) | |
success_action_status = fields.CharField( | |
default='200', | |
blank=True, | |
help_text="The HTTP status code for S3 to use when upload completes." | |
) | |
key = fields.CharField(readonly=True, help_text="The S3 key.") | |
content_type = fields.CharField(readonly=True, help_text="The Content-Type.") | |
signed_get_url = fields.CharField( | |
readonly=True, | |
help_text="The signed URL to retrieve a file." | |
) | |
signed_put_url = fields.CharField( | |
readonly=True, | |
help_text="The signed URL to upload a file to using `signed_url_method`." | |
) | |
form_action = fields.CharField(readonly=True, help_text="The URL to POST to. Used during multipart form POST.") | |
form_aws_access_key_id = fields.CharField( | |
readonly=True, | |
help_text="Used during multipart form POST." | |
) | |
form_policy = fields.DictField(readonly=True, help_text="The policy object.") | |
form_policy_string = fields.CharField( | |
readonly=True, | |
help_text="The policy object." | |
) | |
form_policy_encoded = fields.CharField( | |
readonly=True, | |
help_text="The encoded policy object. Used during multipart form POST." | |
) | |
form_policy_signature = fields.CharField( | |
readonly=True, | |
help_text="The policy signature. Used during multipart form POST." | |
) | |
class Meta: | |
authentication = MultiAuthentication(ApiKeyAuthentication(), SessionAuthentication()) | |
resource_name = 'uploadurl' | |
always_return_data = True | |
object_class = FakeModel | |
include_resource_uri = False | |
list_allowed_methods = ['get', 'post'] | |
detail_allowed_methods = [] | |
def get_resource_uri(self, bundle_or_obj=None, url_name='api_dispatch_list'): | |
return '' | |
def deserialize(self, request, data, format='application/json'): | |
if not data: | |
data = '{}' | |
return super(UploadURLResource, self).deserialize(request, data, format=format) | |
def authorized_create_detail(self, objects, bundle): | |
return True | |
def get_list(self, request, **kwargs): | |
""" | |
We need to support GET for CORS pre-flight requests. | |
""" | |
return http.HttpNoContent() | |
def obj_create(self, bundle, **kwargs): | |
bundle.obj = self._meta.object_class() | |
for key, value in kwargs.items(): | |
setattr(bundle.obj, key, value) | |
bundle = self.full_hydrate(bundle) | |
return self.save(bundle) | |
def save(self, bundle): | |
# in leiu of hydrating, which would fail | |
bundle.obj.update(**bundle.data) | |
extension = getattr(bundle.obj, 'extension', None) | |
if extension: | |
if not re.match(r'^[a-zA-Z\d]+$', extension): | |
raise BadRequest('`extension` can only be numbers and letters.') | |
blacklist = self.EXTRA_ATTRIBUTE_BLACKLIST | |
extra_form_attributes = getattr(bundle.obj, 'extra_form_attributes', None) | |
if extra_form_attributes: | |
for extra_attr in extra_form_attributes: | |
if not extra_attr or not isinstance(extra_attr, basestring) or extra_attr.lower() in blacklist: | |
raise BadRequest('"%s" is not a valid value in `extra_form_attributes`.' % extra_attr) | |
else: | |
bundle.obj.extra_form_attributes = [] | |
success_action_status = getattr( | |
bundle.obj, | |
'success_action_status', | |
self.fields['success_action_status'].default | |
) | |
bundle.obj.success_action_status = success_action_status | |
return bundle | |
def dehydrate(self, bundle): | |
# http://stackoverflow.com/questions/10044151/how-to-generate-a-temporary-url-to-upload-file-to-amazon-s3-with-boto-library | |
user = bundle.request.user | |
data = bundle.data | |
now = timezone.now() | |
expires = now + timedelta(seconds=3600) | |
data['key'] = 'uploads/%s/%s' % (user.id, now.isoformat('T').replace(':', '.')) | |
data['content_type'] = 'application/octet-stream' | |
extension = getattr(bundle.obj, 'extension', None) | |
if extension: | |
data['extension'] = extension | |
data['key'] = '%(key)s.%(extension)s' % data | |
mimetype = mimetypes.guess_type(data['key'], strict=False)[0] | |
if mimetype: | |
data['content_type'] = mimetype | |
extra_form_attributes = getattr(bundle.obj, 'extra_form_attributes', []) | |
data['signed_get_url'] = upload_storage.bucket.new_key(data['key']).generate_url( | |
int(expires.strftime("%s")), | |
method='GET', | |
expires_in_absolute=True | |
) | |
data['signed_put_url'] = upload_storage.bucket.new_key(data['key']).generate_url( | |
int(expires.strftime("%s")), | |
method='PUT', | |
expires_in_absolute=True | |
) | |
bucket_name = upload_storage.bucket.name | |
data['form_action'] = 'https://%s.s3.amazonaws.com' % bucket_name | |
data['form_aws_access_key_id'] = settings.AWS_ACCESS_KEY_ID | |
data['acl'] = 'public-read' | |
data['success_action_status'] = bundle.obj.success_action_status | |
data['form_policy'] = { | |
"conditions": [ | |
{ | |
"bucket": bucket_name | |
}, | |
[ | |
"eq", | |
"$key", | |
data['key'] | |
], | |
{ | |
"acl": data['acl'] | |
}, | |
[ | |
"content-length-range", | |
"0", | |
"4294967296" | |
], | |
[ | |
"eq", | |
"$Content-Type", | |
data['content_type'] | |
], | |
{ | |
"success_action_status": data['success_action_status'] | |
}, | |
], | |
"expiration": expires.isoformat('T').replace('+00:00', 'Z') | |
} | |
for extra_attr in extra_form_attributes: | |
data['form_policy']['conditions'].append([ | |
"starts-with", | |
"$%s" % extra_attr, | |
"" | |
]) | |
data['form_policy_string'] = json.dumps(data['form_policy']) | |
data['form_policy_encoded'] = base64.b64encode(data['form_policy_string']) | |
data['form_policy_signature'] = base64.b64encode(hmac.new( | |
key=settings.AWS_SECRET_ACCESS_KEY, | |
msg=data['form_policy_encoded'], | |
digestmod=hashlib.sha1 | |
).digest()) | |
return bundle |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import re | |
import urlparse | |
from django.contrib.auth import get_user_model | |
from django.core.urlresolvers import reverse | |
from django.test import TestCase | |
from django.utils import timezone | |
from ..api.signed_upload_resource import upload_storage | |
class UploadURLResourceTestCase(TestCase): | |
list_url = reverse('api_dispatch_list', kwargs={'api_name': 'v1', 'resource_name': 'uploadurl'}) | |
detail_url = list_url + '%s/' | |
def setUp(self): | |
super(UploadURLResourceTestCase, self).setUp() | |
self.user1 = get_user_model().objects.create_user('foo', password='foo') | |
self.auth_extra_api_key = { | |
'HTTP_AUTHORIZATION': "ApiKey %s:%s" % (self.user1.username, self.user1.api_key.key,), | |
} | |
def assertSignedURL(self, query, signed=False, extra=None): | |
if isinstance(query, basestring): | |
query = urlparse.parse_qs(query) | |
if not signed and not extra: | |
self.assertEqual(len(query), 0) | |
else: | |
self.assertIn('Expires', query) | |
self.assertIn('AWSAccessKeyId', query) | |
self.assertIn('Signature', query) | |
expected_len = 3 | |
if extra: | |
expected_len += len(extra) | |
for key, val in extra.items(): | |
self.assertEqual(query[key], val) | |
self.assertGreaterEqual(len(query), expected_len) | |
def test__success__get_list(self): | |
response = self.client.get(self.list_url, **self.auth_extra_api_key) | |
self.assertEqual(response.status_code, 204) | |
self.assertEqual(response.content, '') | |
def test__success__post_list(self): | |
data = {} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 201) | |
content = json.loads(response.content) | |
date = timezone.now().date().isoformat() | |
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00' % ( | |
upload_storage.bucket.name, | |
self.user1.id, | |
date | |
)) | |
self.assertEqual(content['extension'], '') | |
self.assertEqual(content['content_type'], 'application/octet-stream') | |
self.assertRegexpMatches(content['signed_get_url'], match_str) | |
self.assertRegexpMatches(content['signed_put_url'], match_str) | |
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True) | |
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True) | |
self.assertEqual(content['acl'], 'public-read') | |
self.assertEqual(content['success_action_status'], '200') | |
def test__success__post_list__empty_request(self): | |
response = self.client.post( | |
self.list_url, | |
'', | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 201) | |
content = json.loads(response.content) | |
date = timezone.now().date().isoformat() | |
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00' % ( | |
upload_storage.bucket.name, | |
self.user1.id, | |
date | |
)) | |
self.assertEqual(content['extension'], '') | |
self.assertEqual(content['content_type'], 'application/octet-stream') | |
self.assertRegexpMatches(content['signed_get_url'], match_str) | |
self.assertRegexpMatches(content['signed_put_url'], match_str) | |
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True) | |
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True) | |
self.assertEqual(content['acl'], 'public-read') | |
self.assertEqual(content['success_action_status'], '200') | |
def test__success__post_list__with_data(self): | |
data = { | |
'extension': 'png', | |
'extra_form_attributes': ['filename'], | |
'success_action_status': '204', | |
} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 201) | |
content = json.loads(response.content) | |
date = timezone.now().date().isoformat() | |
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00\.%s' % ( | |
upload_storage.bucket.name, | |
self.user1.id, | |
date, | |
data['extension'] | |
)) | |
self.assertEqual(content['extension'], data['extension']) | |
self.assertEqual(content['content_type'], 'image/png') | |
self.assertRegexpMatches(content['signed_get_url'], match_str) | |
self.assertRegexpMatches(content['signed_put_url'], match_str) | |
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True) | |
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True) | |
self.assertEqual(content['acl'], 'public-read') | |
self.assertEqual(content['success_action_status'], data['success_action_status']) | |
policy = content['form_policy'] | |
self.assertEqual(policy['conditions'][-1][1], '$filename') | |
def test__failure__post_list__bad_extension(self): | |
data = { | |
'extension': 'exe.jpg', | |
} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 400) | |
def test__failure__post_list__bad_extra_form_attributes_empty_string(self): | |
data = { | |
'extension': 'jpg', | |
'extra_form_attributes': ['filename', ''], | |
} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 400) | |
def test__failure__post_list__bad_extra_form_attributes_number(self): | |
data = { | |
'extension': 'jpg', | |
'extra_form_attributes': ['filename', 1], | |
} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 400) | |
def test__failure__post_list__bad_extra_form_attributes_blacklisted(self): | |
data = { | |
'extension': 'jpg', | |
'extra_form_attributes': ['filename', 'Cache-control'], | |
} | |
response = self.client.post( | |
self.list_url, | |
json.dumps(data), | |
content_type='application/json', | |
**self.auth_extra_api_key | |
) | |
self.assertEqual(response.status_code, 400) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment