-
-
Save gregnewman/119843 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mimetypes import guess_type | |
from django.core.exceptions import ImproperlyConfigured | |
from django.core.files.storage import Storage | |
from django.utils.encoding import iri_to_uri | |
import re | |
try: | |
import S3 | |
except ImportError: | |
raise ImproperlyConfigured, "Could not load amazon's S3 bindings.\ | |
\nSee http://developer.amazonwebservices.com/connect/entry.jspa?externalID=134" | |
try: | |
from settings import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME | |
except ImportError: | |
raise ImproperlyConfigured, "AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME required in settings.py." | |
try: | |
from settings import AWS_CALLING_FORMAT | |
except ImportError: | |
AWS_CALLING_FORMAT = S3.CallingFormat.PATH | |
class S3StorageException(Exception): | |
"""Exception raised when an S3 error occurs.""" | |
class S3Storage(Storage): | |
""" | |
Storage backend for Amazon Simple Storage Service. | |
""" | |
def __init__(self, access_key=AWS_ACCESS_KEY_ID, secret_key=AWS_SECRET_ACCESS_KEY, | |
bucket=AWS_STORAGE_BUCKET_NAME, acl='public-read', | |
calling_format=AWS_CALLING_FORMAT): | |
self.bucket = bucket | |
self.acl = acl | |
self.connection = S3.AWSAuthConnection(access_key, secret_key, | |
calling_format=calling_format) | |
self.generator = S3.QueryStringAuthGenerator(access_key, secret_key, | |
calling_format=calling_format, is_secure=False) | |
def is_valid(self): | |
""" | |
Tests if the connection is valid by checking the basic | |
rest GET endpoint. | |
""" | |
response = self.connection.list_all_my_buckets().http_response | |
return response.status == 200 | |
def open(self, name, mode='rb', mixin=None): | |
file = self.connection.get(self.bucket, name).object.data | |
if mixin: | |
# Add the mixin as a parent class of the File returned from storage. | |
file.__class__ = type(mixin.__name__, (mixin, file.__class__), {}) | |
return file | |
def save(self, name, content): | |
# S3 name can be a path | |
if name is None: | |
name = content.name | |
headers = { | |
'x-amz-acl': self.acl, | |
'Content-Type': content.content_type or guess_type(name)[0] or "application/x-octet-stream", | |
'Content-Disposition': 'filename="' + iri_to_uri(content.name) + '";', | |
} | |
response = self.connection.put(self.bucket, name, content.read(), headers) | |
if response.http_response.status != 200: | |
raise S3StorageException(response.message) | |
return name | |
def delete(self, name): | |
self.connection.delete(self.bucket, name) | |
def copy(self, old_name, name): | |
try: | |
filename = name.rsplit('/', 1)[1] | |
except IndexError: | |
filename = name | |
# copy current content-type | |
response = self.connection._make_request('HEAD', self.bucket, old_name) | |
content_type = response.getheader('Content-Type') | |
headers = { | |
'x-amz-copy-source': '%s/%s' % (self.bucket, old_name), | |
'x-amz-metadata-directive': 'REPLACE', # change content-disposition for new name | |
'x-amz-acl': self.acl, # otherwise would be reset to private | |
'Content-Type': content_type, | |
'Content-Disposition': 'filename="' + iri_to_uri(filename) + '";', | |
} | |
response = self.connection.copy(self.bucket, name, headers) | |
if response.http_response.status != 200: | |
raise S3StorageException(response.message) | |
return name | |
def move(self, old_name, name): | |
# copy, then delete | |
self.copy(old_name, name) | |
self.delete(old_name) | |
def exists(self, name): | |
print '*** S3 EXISTS' | |
#response = self.connection.make_request('HEAD', self.bucket, name) | |
#print response | |
#return response.status == 200 | |
raise NotImplementedError() | |
def listbucket(self): | |
response = self.connection.list_bucket(self.bucket) | |
return [entry.key for entry in response.entries] | |
def listdir(self, path): | |
""" | |
Lists the contents of the specified path, returning a 2-tuple of lists; | |
the first item being directories, the second item being files. | |
""" | |
directories, files = [], [] | |
entries = self.listbucket() | |
if path: | |
path_re = re.compile('^(?P<path>%s/)(?P<subpath>.*)' % path) | |
else: # root directory | |
path_re = re.compile('^(?P<path>)(?P<subpath>.*)') | |
for entry in entries: | |
match = path_re.match(entry) | |
if match: | |
subpieces = match.group('subpath').split('/', 1) | |
try: # check if this is a nested file | |
directory, more_path = subpieces | |
if not directory in directories: | |
directories.append(directory) | |
except ValueError: # this is a file - no more path | |
files.append(subpieces[0]) | |
return directories, files | |
def size(self, name): | |
data = self.connection.get(self.bucket, name).object.data | |
return len(data) | |
def url(self, name): | |
return self.generator.make_bare_url(self.bucket, name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment