Skip to content

Instantly share code, notes, and snippets.

@SeanHayes
Last active April 10, 2016 22:23
Show Gist options
  • Save SeanHayes/6e7c6094f6a8265e8b56 to your computer and use it in GitHub Desktop.
Save SeanHayes/6e7c6094f6a8265e8b56 to your computer and use it in GitHub Desktop.
Example of a Tastypie Resource which returns a signed upload URL for AWS S3
import base64
from datetime import timedelta
import hashlib
import hmac
import json
import mimetypes
import re
from django.conf import settings
from django.utils import timezone
from tastypie import fields, http, resources
from tastypie.authentication import ApiKeyAuthentication, MultiAuthentication,\
SessionAuthentication
from tastypie.exceptions import BadRequest
# upload_storage should be an instance of S3BotoStorage from django-storages
from django.core.files.storage import default_storage
upload_storage = default_storage
class FakeModel(object):
pk = None
def __init__(self, **kwargs):
self.update(**kwargs)
def update(self, **kwargs):
self.__dict__.update(kwargs)
class UploadURLResource(resources.Resource):
EXTRA_ATTRIBUTE_BLACKLIST = frozenset([
'acl',
'bucket',
'cache-control',
'content-disposition',
'content-encoding',
'content-length-range',
'content-type',
'expires',
'key',
'redirect',
'success_action_redirect',
'success_action_status',
'x-amz-security-token',
])
extension = fields.CharField(
default='',
blank=True,
help_text="The extension to append to the generated key."
)
extra_form_attributes = fields.ListField(
null=True,
blank=True,
help_text="A list of strings specifying extra form fields that will get sent to S3."
)
success_action_status = fields.CharField(
default='200',
blank=True,
help_text="The HTTP status code for S3 to use when upload completes."
)
key = fields.CharField(readonly=True, help_text="The S3 key.")
content_type = fields.CharField(readonly=True, help_text="The Content-Type.")
signed_get_url = fields.CharField(
readonly=True,
help_text="The signed URL to retrieve a file."
)
signed_put_url = fields.CharField(
readonly=True,
help_text="The signed URL to upload a file to using `signed_url_method`."
)
form_action = fields.CharField(readonly=True, help_text="The URL to POST to. Used during multipart form POST.")
form_aws_access_key_id = fields.CharField(
readonly=True,
help_text="Used during multipart form POST."
)
form_policy = fields.DictField(readonly=True, help_text="The policy object.")
form_policy_string = fields.CharField(
readonly=True,
help_text="The policy object."
)
form_policy_encoded = fields.CharField(
readonly=True,
help_text="The encoded policy object. Used during multipart form POST."
)
form_policy_signature = fields.CharField(
readonly=True,
help_text="The policy signature. Used during multipart form POST."
)
class Meta:
authentication = MultiAuthentication(ApiKeyAuthentication(), SessionAuthentication())
resource_name = 'uploadurl'
always_return_data = True
object_class = FakeModel
include_resource_uri = False
list_allowed_methods = ['get', 'post']
detail_allowed_methods = []
def get_resource_uri(self, bundle_or_obj=None, url_name='api_dispatch_list'):
return ''
def deserialize(self, request, data, format='application/json'):
if not data:
data = '{}'
return super(UploadURLResource, self).deserialize(request, data, format=format)
def authorized_create_detail(self, objects, bundle):
return True
def get_list(self, request, **kwargs):
"""
We need to support GET for CORS pre-flight requests.
"""
return http.HttpNoContent()
def obj_create(self, bundle, **kwargs):
bundle.obj = self._meta.object_class()
for key, value in kwargs.items():
setattr(bundle.obj, key, value)
bundle = self.full_hydrate(bundle)
return self.save(bundle)
def save(self, bundle):
# in leiu of hydrating, which would fail
bundle.obj.update(**bundle.data)
extension = getattr(bundle.obj, 'extension', None)
if extension:
if not re.match(r'^[a-zA-Z\d]+$', extension):
raise BadRequest('`extension` can only be numbers and letters.')
blacklist = self.EXTRA_ATTRIBUTE_BLACKLIST
extra_form_attributes = getattr(bundle.obj, 'extra_form_attributes', None)
if extra_form_attributes:
for extra_attr in extra_form_attributes:
if not extra_attr or not isinstance(extra_attr, basestring) or extra_attr.lower() in blacklist:
raise BadRequest('"%s" is not a valid value in `extra_form_attributes`.' % extra_attr)
else:
bundle.obj.extra_form_attributes = []
success_action_status = getattr(
bundle.obj,
'success_action_status',
self.fields['success_action_status'].default
)
bundle.obj.success_action_status = success_action_status
return bundle
def dehydrate(self, bundle):
# http://stackoverflow.com/questions/10044151/how-to-generate-a-temporary-url-to-upload-file-to-amazon-s3-with-boto-library
user = bundle.request.user
data = bundle.data
now = timezone.now()
expires = now + timedelta(seconds=3600)
data['key'] = 'uploads/%s/%s' % (user.id, now.isoformat('T').replace(':', '.'))
data['content_type'] = 'application/octet-stream'
extension = getattr(bundle.obj, 'extension', None)
if extension:
data['extension'] = extension
data['key'] = '%(key)s.%(extension)s' % data
mimetype = mimetypes.guess_type(data['key'], strict=False)[0]
if mimetype:
data['content_type'] = mimetype
extra_form_attributes = getattr(bundle.obj, 'extra_form_attributes', [])
data['signed_get_url'] = upload_storage.bucket.new_key(data['key']).generate_url(
int(expires.strftime("%s")),
method='GET',
expires_in_absolute=True
)
data['signed_put_url'] = upload_storage.bucket.new_key(data['key']).generate_url(
int(expires.strftime("%s")),
method='PUT',
expires_in_absolute=True
)
bucket_name = upload_storage.bucket.name
data['form_action'] = 'https://%s.s3.amazonaws.com' % bucket_name
data['form_aws_access_key_id'] = settings.AWS_ACCESS_KEY_ID
data['acl'] = 'public-read'
data['success_action_status'] = bundle.obj.success_action_status
data['form_policy'] = {
"conditions": [
{
"bucket": bucket_name
},
[
"eq",
"$key",
data['key']
],
{
"acl": data['acl']
},
[
"content-length-range",
"0",
"4294967296"
],
[
"eq",
"$Content-Type",
data['content_type']
],
{
"success_action_status": data['success_action_status']
},
],
"expiration": expires.isoformat('T').replace('+00:00', 'Z')
}
for extra_attr in extra_form_attributes:
data['form_policy']['conditions'].append([
"starts-with",
"$%s" % extra_attr,
""
])
data['form_policy_string'] = json.dumps(data['form_policy'])
data['form_policy_encoded'] = base64.b64encode(data['form_policy_string'])
data['form_policy_signature'] = base64.b64encode(hmac.new(
key=settings.AWS_SECRET_ACCESS_KEY,
msg=data['form_policy_encoded'],
digestmod=hashlib.sha1
).digest())
return bundle
import json
import re
import urlparse
from django.contrib.auth import get_user_model
from django.core.urlresolvers import reverse
from django.test import TestCase
from django.utils import timezone
from ..api.signed_upload_resource import upload_storage
class UploadURLResourceTestCase(TestCase):
list_url = reverse('api_dispatch_list', kwargs={'api_name': 'v1', 'resource_name': 'uploadurl'})
detail_url = list_url + '%s/'
def setUp(self):
super(UploadURLResourceTestCase, self).setUp()
self.user1 = get_user_model().objects.create_user('foo', password='foo')
self.auth_extra_api_key = {
'HTTP_AUTHORIZATION': "ApiKey %s:%s" % (self.user1.username, self.user1.api_key.key,),
}
def assertSignedURL(self, query, signed=False, extra=None):
if isinstance(query, basestring):
query = urlparse.parse_qs(query)
if not signed and not extra:
self.assertEqual(len(query), 0)
else:
self.assertIn('Expires', query)
self.assertIn('AWSAccessKeyId', query)
self.assertIn('Signature', query)
expected_len = 3
if extra:
expected_len += len(extra)
for key, val in extra.items():
self.assertEqual(query[key], val)
self.assertGreaterEqual(len(query), expected_len)
def test__success__get_list(self):
response = self.client.get(self.list_url, **self.auth_extra_api_key)
self.assertEqual(response.status_code, 204)
self.assertEqual(response.content, '')
def test__success__post_list(self):
data = {}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
date = timezone.now().date().isoformat()
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00' % (
upload_storage.bucket.name,
self.user1.id,
date
))
self.assertEqual(content['extension'], '')
self.assertEqual(content['content_type'], 'application/octet-stream')
self.assertRegexpMatches(content['signed_get_url'], match_str)
self.assertRegexpMatches(content['signed_put_url'], match_str)
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True)
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True)
self.assertEqual(content['acl'], 'public-read')
self.assertEqual(content['success_action_status'], '200')
def test__success__post_list__empty_request(self):
response = self.client.post(
self.list_url,
'',
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
date = timezone.now().date().isoformat()
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00' % (
upload_storage.bucket.name,
self.user1.id,
date
))
self.assertEqual(content['extension'], '')
self.assertEqual(content['content_type'], 'application/octet-stream')
self.assertRegexpMatches(content['signed_get_url'], match_str)
self.assertRegexpMatches(content['signed_put_url'], match_str)
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True)
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True)
self.assertEqual(content['acl'], 'public-read')
self.assertEqual(content['success_action_status'], '200')
def test__success__post_list__with_data(self):
data = {
'extension': 'png',
'extra_form_attributes': ['filename'],
'success_action_status': '204',
}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 201)
content = json.loads(response.content)
date = timezone.now().date().isoformat()
match_str = re.compile(r'https://%s.s3.amazonaws.com/uploads/%s/%sT\d\d\.\d\d\.\d\d\.\d+%%2B00.00\.%s' % (
upload_storage.bucket.name,
self.user1.id,
date,
data['extension']
))
self.assertEqual(content['extension'], data['extension'])
self.assertEqual(content['content_type'], 'image/png')
self.assertRegexpMatches(content['signed_get_url'], match_str)
self.assertRegexpMatches(content['signed_put_url'], match_str)
self.assertSignedURL(urlparse.urlparse(content['signed_get_url']).query, signed=True)
self.assertSignedURL(urlparse.urlparse(content['signed_put_url']).query, signed=True)
self.assertEqual(content['acl'], 'public-read')
self.assertEqual(content['success_action_status'], data['success_action_status'])
policy = content['form_policy']
self.assertEqual(policy['conditions'][-1][1], '$filename')
def test__failure__post_list__bad_extension(self):
data = {
'extension': 'exe.jpg',
}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 400)
def test__failure__post_list__bad_extra_form_attributes_empty_string(self):
data = {
'extension': 'jpg',
'extra_form_attributes': ['filename', ''],
}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 400)
def test__failure__post_list__bad_extra_form_attributes_number(self):
data = {
'extension': 'jpg',
'extra_form_attributes': ['filename', 1],
}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 400)
def test__failure__post_list__bad_extra_form_attributes_blacklisted(self):
data = {
'extension': 'jpg',
'extra_form_attributes': ['filename', 'Cache-control'],
}
response = self.client.post(
self.list_url,
json.dumps(data),
content_type='application/json',
**self.auth_extra_api_key
)
self.assertEqual(response.status_code, 400)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment