Skip to content

Instantly share code, notes, and snippets.

@robhudson
Forked from leah/storage.py
Created May 27, 2009 16:56
Show Gist options
  • Save robhudson/118747 to your computer and use it in GitHub Desktop.
Save robhudson/118747 to your computer and use it in GitHub Desktop.
from mimetypes import guess_type
from django.core.exceptions import ImproperlyConfigured
from django.core.files.storage import Storage
from django.utils.encoding import iri_to_uri
import re
try:
import S3
except ImportError:
raise ImproperlyConfigured, "Could not load amazon's S3 bindings.\
\nSee http://developer.amazonwebservices.com/connect/entry.jspa?externalID=134"
try:
from settings import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME
except ImportError:
raise ImproperlyConfigured, "AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_STORAGE_BUCKET_NAME required in settings.py."
try:
from settings import AWS_CALLING_FORMAT
except ImportError:
AWS_CALLING_FORMAT = S3.CallingFormat.PATH
class S3StorageException(Exception):
"""Exception raised when an S3 error occurs."""
class S3Storage(Storage):
"""
Storage backend for Amazon Simple Storage Service.
"""
def __init__(self, access_key=AWS_ACCESS_KEY_ID, secret_key=AWS_SECRET_ACCESS_KEY,
bucket=AWS_STORAGE_BUCKET_NAME, acl='public-read',
calling_format=AWS_CALLING_FORMAT):
self.bucket = bucket
self.acl = acl
self.connection = S3.AWSAuthConnection(access_key, secret_key,
calling_format=calling_format)
self.generator = S3.QueryStringAuthGenerator(access_key, secret_key,
calling_format=calling_format, is_secure=False)
def is_valid(self):
"""
Tests if the connection is valid by checking the basic
rest GET endpoint.
"""
response = self.connection.list_all_my_buckets().http_response
return response.status == 200
def open(self, name, mode='rb', mixin=None):
file = self.connection.get(self.bucket, name).object.data
if mixin:
# Add the mixin as a parent class of the File returned from storage.
file.__class__ = type(mixin.__name__, (mixin, file.__class__), {})
return file
def save(self, name, content):
# S3 name can be a path
if name is None:
name = content.name
headers = {
'x-amz-acl': self.acl,
'Content-Type': content.content_type or guess_type(name)[0] or "application/x-octet-stream",
'Content-Disposition': 'filename="' + iri_to_uri(content.name) + '";',
}
response = self.connection.put(self.bucket, name, content.read(), headers)
if response.http_response.status != 200:
raise S3StorageException(response.message)
return name
def delete(self, name):
self.connection.delete(self.bucket, name)
def copy(self, old_name, name):
try:
filename = name.rsplit('/', 1)[1]
except IndexError:
filename = name
# copy current content-type
response = self.connection._make_request('HEAD', self.bucket, old_name)
content_type = response.getheader('Content-Type')
headers = {
'x-amz-copy-source': '%s/%s' % (self.bucket, old_name),
'x-amz-metadata-directive': 'REPLACE', # change content-disposition for new name
'x-amz-acl': self.acl, # otherwise would be reset to private
'Content-Type': content_type,
'Content-Disposition': 'filename="' + iri_to_uri(filename) + '";',
}
response = self.connection.copy(self.bucket, name, headers)
if response.http_response.status != 200:
raise S3StorageException(response.message)
return name
def move(self, old_name, name):
# copy, then delete
self.copy(old_name, name)
self.delete(old_name)
def exists(self, name):
print '*** S3 EXISTS'
#response = self.connection.make_request('HEAD', self.bucket, name)
#print response
#return response.status == 200
raise NotImplementedError()
def listbucket(self):
response = self.connection.list_bucket(self.bucket)
return [entry.key for entry in response.entries]
def listdir(self, path):
"""
Lists the contents of the specified path, returning a 2-tuple of lists;
the first item being directories, the second item being files.
"""
directories, files = [], []
entries = self.listbucket()
if path:
path_re = re.compile('^(?P<path>%s/)(?P<subpath>.*)' % path)
else: # root directory
path_re = re.compile('^(?P<path>)(?P<subpath>.*)')
for entry in entries:
match = path_re.match(entry)
if match:
subpieces = match.group('subpath').split('/', 1)
try: # check if this is a nested file
directory, more_path = subpieces
if not directory in directories:
directories.append(directory)
except ValueError: # this is a file - no more path
files.append(subpieces[0])
return directories, files
def size(self, name):
data = self.connection.get(self.bucket, name).object.data
return len(data)
def url(self, name):
return self.generator.make_bare_url(self.bucket, name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment