Skip to content

Instantly share code, notes, and snippets.

@kalefranz
Last active August 29, 2015 14:20
Show Gist options
  • Save kalefranz/3facb62d6b02fbce6c4f to your computer and use it in GitHub Desktop.
Save kalefranz/3facb62d6b02fbce6c4f to your computer and use it in GitHub Desktop.
boto s3 wrapper
import time
import uuid
from boto.exception import S3ResponseError
from boto.s3.connection import S3Connection
from boto.s3.key import Key
def create_key():
return str(time.time()) + '-' + uuid.uuid4().hex
class S3WriteError(IOError):
pass
class S3Storage(object):
"""Basic CRUD utility for Amazon S3 storage
Args:
aws_access_key (str): properly credentialed access id for s3 access
aws_secret_key (str): secret key for id
s3_bucket (str): s3 bucket name to use for storage
"""
def __init__(self, aws_access_key, aws_secret_key, s3_bucket):
self.conn = S3Connection(aws_access_key, aws_secret_key)
self.bucket = self.conn.get_bucket(s3_bucket)
def create(self, data, key=None):
"""Basic create command. Will not overwrite a key that already exists.
Args:
data (str): a byte string containing the data to be written
key (str, optional): unique file key for bucket
Returns:
str: the key to which the data was written
Raises:
S3WriteError: when data has not been written
"""
if not key:
key = create_key()
r = self._set(key, data, False)
if r != len(data):
raise S3WriteError("Length of content written did not match length of content")
return key
def read(self, key):
"""Basic read.
Args:
key (str): key in bucket to be read
Returns:
str: contents of file as byte string
"""
try:
return self._get(key)
except S3ResponseError:
raise KeyError("Key not found.")
def update(self, key, data):
"""Basic update/upsert command.
Creates a new file if key does not already exist. Overwrites if key exists.
"""
self._set(key, data, True)
def delete(self, key):
"""Basic delete.
Args:
key (str): key in bucket to be removed
"""
self._remove(key)
def _set(self, key, data, replace=False):
k = Key(self.bucket, key)
return k.set_contents_from_string(data, replace=replace)
def _get(self, key):
k = Key(self.bucket, key)
return k.get_contents_as_string()
def _remove(self, key):
return self.bucket.delete_keys([Key(self.bucket, key)])
import random
import os
from testtools import TestCase, skipUnless
from transcomm.config import appcontext
from transcomm.common.s3 import S3Storage, S3WriteError, create_key
class S3StorageTests(TestCase):
def setUp(self):
super(S3StorageTests, self).setUp()
self.s3_bucket = S3Storage(appcontext.aws_access_key,
appcontext.aws_secret_key,
'ttam.qa.functional')
@skipUnless(appcontext.allow_external_tests, "External connection required.")
def test_full_crud(self):
data = os.urandom(random.randint(50, 500))
key = create_key()
#create
r_key = self.s3_bucket.create(data, key)
self.assertEqual(key, r_key)
#read
retrieved_data = self.s3_bucket.read(key)
self.assertEqual(data, retrieved_data)
#update to overwrite existing file
new_data = os.urandom(random.randint(50, 500))
self.s3_bucket.update(key, new_data)
new_data_read = self.s3_bucket.read(key)
self.assertNotEqual(data, new_data_read)
self.assertEqual(new_data, new_data_read)
#delete
self.s3_bucket.delete(key)
@skipUnless(appcontext.allow_external_tests, "External connection required.")
def test_create_error_key_clash(self):
data = os.urandom(random.randint(50, 500))
key = create_key()
r_key = self.s3_bucket.create(data, key)
self.assertEqual(key, r_key)
self.assertRaises(S3WriteError, self.s3_bucket.create, data, key)
self.s3_bucket.delete(key)
@skipUnless(appcontext.allow_external_tests, "External connection required.")
def test_create_default_key(self):
data = os.urandom(random.randint(50, 500))
#create
r_key = self.s3_bucket.create(data)
#read to verify
retrieved_data = self.s3_bucket.read(r_key)
self.assertEqual(data, retrieved_data)
#delete to clean up
self.s3_bucket.delete(r_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment