Last active
August 29, 2015 14:20
-
-
Save kalefranz/3facb62d6b02fbce6c4f to your computer and use it in GitHub Desktop.
boto s3 wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
from boto.exception import S3ResponseError | |
from boto.s3.connection import S3Connection | |
from boto.s3.key import Key | |
def create_key(): | |
return str(time.time()) + '-' + uuid.uuid4().hex | |
class S3WriteError(IOError): | |
pass | |
class S3Storage(object): | |
"""Basic CRUD utility for Amazon S3 storage | |
Args: | |
aws_access_key (str): properly credentialed access id for s3 access | |
aws_secret_key (str): secret key for id | |
s3_bucket (str): s3 bucket name to use for storage | |
""" | |
def __init__(self, aws_access_key, aws_secret_key, s3_bucket): | |
self.conn = S3Connection(aws_access_key, aws_secret_key) | |
self.bucket = self.conn.get_bucket(s3_bucket) | |
def create(self, data, key=None): | |
"""Basic create command. Will not overwrite a key that already exists. | |
Args: | |
data (str): a byte string containing the data to be written | |
key (str, optional): unique file key for bucket | |
Returns: | |
str: the key to which the data was written | |
Raises: | |
S3WriteError: when data has not been written | |
""" | |
if not key: | |
key = create_key() | |
r = self._set(key, data, False) | |
if r != len(data): | |
raise S3WriteError("Length of content written did not match length of content") | |
return key | |
def read(self, key): | |
"""Basic read. | |
Args: | |
key (str): key in bucket to be read | |
Returns: | |
str: contents of file as byte string | |
""" | |
try: | |
return self._get(key) | |
except S3ResponseError: | |
raise KeyError("Key not found.") | |
def update(self, key, data): | |
"""Basic update/upsert command. | |
Creates a new file if key does not already exist. Overwrites if key exists. | |
""" | |
self._set(key, data, True) | |
def delete(self, key): | |
"""Basic delete. | |
Args: | |
key (str): key in bucket to be removed | |
""" | |
self._remove(key) | |
def _set(self, key, data, replace=False): | |
k = Key(self.bucket, key) | |
return k.set_contents_from_string(data, replace=replace) | |
def _get(self, key): | |
k = Key(self.bucket, key) | |
return k.get_contents_as_string() | |
def _remove(self, key): | |
return self.bucket.delete_keys([Key(self.bucket, key)]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import os | |
from testtools import TestCase, skipUnless | |
from transcomm.config import appcontext | |
from transcomm.common.s3 import S3Storage, S3WriteError, create_key | |
class S3StorageTests(TestCase): | |
def setUp(self): | |
super(S3StorageTests, self).setUp() | |
self.s3_bucket = S3Storage(appcontext.aws_access_key, | |
appcontext.aws_secret_key, | |
'ttam.qa.functional') | |
@skipUnless(appcontext.allow_external_tests, "External connection required.") | |
def test_full_crud(self): | |
data = os.urandom(random.randint(50, 500)) | |
key = create_key() | |
#create | |
r_key = self.s3_bucket.create(data, key) | |
self.assertEqual(key, r_key) | |
#read | |
retrieved_data = self.s3_bucket.read(key) | |
self.assertEqual(data, retrieved_data) | |
#update to overwrite existing file | |
new_data = os.urandom(random.randint(50, 500)) | |
self.s3_bucket.update(key, new_data) | |
new_data_read = self.s3_bucket.read(key) | |
self.assertNotEqual(data, new_data_read) | |
self.assertEqual(new_data, new_data_read) | |
#delete | |
self.s3_bucket.delete(key) | |
@skipUnless(appcontext.allow_external_tests, "External connection required.") | |
def test_create_error_key_clash(self): | |
data = os.urandom(random.randint(50, 500)) | |
key = create_key() | |
r_key = self.s3_bucket.create(data, key) | |
self.assertEqual(key, r_key) | |
self.assertRaises(S3WriteError, self.s3_bucket.create, data, key) | |
self.s3_bucket.delete(key) | |
@skipUnless(appcontext.allow_external_tests, "External connection required.") | |
def test_create_default_key(self): | |
data = os.urandom(random.randint(50, 500)) | |
#create | |
r_key = self.s3_bucket.create(data) | |
#read to verify | |
retrieved_data = self.s3_bucket.read(r_key) | |
self.assertEqual(data, retrieved_data) | |
#delete to clean up | |
self.s3_bucket.delete(r_key) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment