Created
September 11, 2015 16:37
-
-
Save nanvel/3bdf8e577119b5d9dc4f to your computer and use it in GitHub Desktop.
DDBTable class for DynamoDB in examples (http://nanvel.com/p/dynamodb)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import botocore.session | |
import logging | |
import re | |
import six | |
from functools import partial | |
logger = logging.getLogger() | |
class AmazonException(Exception): | |
def __init__(self, message, code='unknown'): | |
self.message = message | |
self.code = code | |
def __str__(self): | |
return self.message | |
class DDBException(AmazonException): | |
ITEM_ENCODE_ERROR = 'ItemEncodeError' | |
ITEM_DECODE_ERROR = 'ItemDecodeError' | |
class Botocore(object): | |
_session = None | |
def __init__(self, service, operation, region_name, endpoint_url=None): | |
session = Botocore._session | |
if session is None: | |
session = botocore.session.get_session() | |
Botocore._session = session | |
service = session.get_service(service_name=service) | |
self.endpoint = service.get_endpoint( | |
region_name=region_name, endpoint_url=endpoint_url) | |
self.operation = service.get_operation(operation_name=operation) | |
def call(self, **kwargs): | |
response, message = self.operation.call(endpoint=self.endpoint, **kwargs) | |
if response.status_code != 200: | |
raise AmazonException( | |
message='DynamoDB request error: {message}.'.format( | |
message=message.get('Error', {'Message': 'unknown'})['Message']), | |
code=message.get('Error', {'Code': 'unknown'})['Code']) | |
return message | |
class DDBField(object): | |
@classmethod | |
def _validate(cls, value): | |
raise NotImplementedError('Not implemented.') | |
@classmethod | |
def decode(cls, value): | |
try: | |
return cls._validate(value) | |
except (TypeError, ValueError): | |
raise DDBException( | |
message='Invalid value for {cls} decode.'.format(cls=cls.__name__), | |
code=DDBException.ITEM_DECODE_ERROR) | |
@classmethod | |
def encode(cls, value): | |
try: | |
return str(cls._validate(value)) | |
except (TypeError, ValueError): | |
raise DDBException( | |
message='Invalid value for {cls} encode.'.format(cls=cls.__name__), | |
code=DDBException.ITEM_ENCODE_ERROR) | |
class DDBIntField(DDBField): | |
AMAZON_TYPE = 'N' | |
@classmethod | |
def _validate(cls, value): | |
if isinstance(value, int): | |
return value | |
return int(value) | |
class DDBInt_IntField(DDBField): | |
AMAZON_TYPE = 'S' | |
_REGEXP = re.compile('\d+_\d+') | |
@classmethod | |
def _validate(cls, value): | |
if not isinstance(value, str): | |
value = str(value) | |
if cls._REGEXP.match(value) is None: | |
raise ValueError('Int_Int required.') | |
return value | |
class DDBUUIDField(DDBField): | |
AMAZON_TYPE = 'S' | |
_UUID_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}') | |
@classmethod | |
def _validate(cls, value): | |
if not isinstance(value, str): | |
value = str(value) | |
if cls._UUID_REGEXP.match(value) is None: | |
raise ValueError('UUID required.') | |
return value | |
class DDBUUID_UUIDField(DDBField): | |
AMAZON_TYPE = 'S' | |
_VALUE_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}') | |
@classmethod | |
def _validate(cls, value): | |
if not isinstance(value, str): | |
value = str(value) | |
if cls._VALUE_REGEXP.match(value) is None: | |
raise ValueError('UUID_UUID required.') | |
return value | |
class DDBStrField(DDBField): | |
AMAZON_TYPE = 'S' | |
@classmethod | |
def _validate(cls, value): | |
if not isinstance(value, str): | |
value = str(value) | |
return value | |
class DDBTable(object): | |
TABLE_NAME = '' | |
REGION_NAME = 'us-west-2' | |
KEY_SCHEMA = [] | |
LOCAL_SECONDARY_INDEXES = [] | |
GLOBAL_SECONDARY_INDEXES = [] | |
PROVISIONED_THROUGHPUT = {} | |
FIELDS = {} | |
def _get_table_name(self): | |
return self.TABLE_NAME | |
def _get_table_kwargs(self): | |
key_fields = set() | |
for key in self.KEY_SCHEMA: | |
key_fields.add(key['AttributeName']) | |
for index in self.LOCAL_SECONDARY_INDEXES: | |
for key in index['KeySchema']: | |
key_fields.add(key['AttributeName']) | |
for index in self.GLOBAL_SECONDARY_INDEXES: | |
for key in index['KeySchema']: | |
key_fields.add(key['AttributeName']) | |
attribute_definitions = [] | |
for field_name in key_fields: | |
attribute_definitions.append({ | |
'AttributeName': field_name, | |
'AttributeType': self.FIELDS[field_name].AMAZON_TYPE | |
}) | |
kwargs = { | |
'TableName': self._get_table_name(), | |
'AttributeDefinitions': attribute_definitions, | |
'KeySchema': self.KEY_SCHEMA, | |
'ProvisionedThroughput': self.PROVISIONED_THROUGHPUT, | |
} | |
if getattr(self, 'LOCAL_SECONDARY_INDEXES', None): | |
kwargs['LocalSecondaryIndexes'] = self.LOCAL_SECONDARY_INDEXES | |
if getattr(self, 'GLOBAL_SECONDARY_INDEXES', None): | |
kwargs['GlobalSecondaryIndexes'] = self.GLOBAL_SECONDARY_INDEXES | |
return kwargs | |
def _get_endpoint_url(self): | |
return None | |
def _dynamodb(self, operation): | |
return Botocore( | |
service='dynamodb', operation=operation, | |
region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url()) | |
def create_table(self): | |
try: | |
message = self._dynamodb(operation='DescribeTable').call( | |
TableName=self._get_table_name()) | |
except AmazonException as e: | |
if e.code != 'ResourceNotFoundException': | |
raise e | |
logger.warning('Creation {table_name} table ...'.format( | |
table_name=self._get_table_name())) | |
message = self._dynamodb(operation='CreateTable').call( | |
**self._get_table_kwargs()) | |
else: | |
logger.warning('{table_name} table already exists.'.format( | |
table_name=self._get_table_name())) | |
def encode_item(self, data, keys=None, update=False): | |
if not data: | |
return {} | |
keys = keys or data.keys() | |
item = {} | |
for key in keys: | |
if key not in data: | |
continue | |
val = self.FIELDS[key].encode(value=data[key]) | |
if update: | |
item[key] = { | |
'Value': { | |
self.FIELDS[key].AMAZON_TYPE: val | |
}, | |
'Action': 'PUT' | |
} | |
else: | |
item[key] = { | |
self.FIELDS[key].AMAZON_TYPE: val | |
} | |
return item | |
def decode_item(self, item, keys=None): | |
data = {} | |
for key, val in six.iteritems(item): | |
if key not in self.FIELDS: | |
continue | |
if keys and key not in keys: | |
continue | |
data[key] = self.FIELDS[key].decode( | |
val[self.FIELDS[key].AMAZON_TYPE]) | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment