Created
September 28, 2021 13:57
-
-
Save saurabh2590/c5f9c70b2845ad972217ecbb238f3af9 to your computer and use it in GitHub Desktop.
DynamoDB Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from json import dumps | |
from typing import List, Optional, Dict, Any | |
import logging | |
import boto3 | |
import botocore | |
from boto3.dynamodb.conditions import ConditionBase, ConditionExpressionBuilder | |
from botocore.config import Config | |
from structlog import get_logger | |
from zope.interface import Interface, implementer | |
from bm.settings import get_settings | |
from bm.storage.aws.helpers import get_table_name | |
logging.getLogger("urllib3.connection").setLevel(logging.WARNING) | |
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) | |
logging.getLogger("urllib3.poolmanager").setLevel(logging.WARNING) | |
class DynamoDBException(Exception): | |
"""Raises when something went wrong on the interface | |
with AWS DynamoDB service | |
""" | |
class IDynamoDBTable(Interface): | |
def get_batch(self): | |
""" | |
Get batch writer for the dynmamoDB table | |
:return: | |
""" | |
def get_item( | |
self, search_key: str, kwargs: Dict[str, Any] | |
) -> Optional[Dict[str, Any]]: | |
"""Get single item from DynamoDB table based on the search key. | |
kwargs: | |
- projected_attributes: Set[str] | |
- reserved_keyword_alias: dict | |
Example: | |
search_key: {'user_submission_id': "12345678" } | |
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})... | |
...since 'data' is a reserved keyword of AWS DynamoDB | |
will retrun only { 'data': <value> } as a response | |
if projected_attributes: set([]), all attributes are returned | |
""" | |
def batch_get_item( | |
self, search_keys: List[Dict[str, Any]], kwargs | |
) -> List[Dict[str, Any]]: | |
"""Get batch of items from DynamoDB table based on the search keys list provided. | |
kwargs: | |
- projected_attributes: Set[str] | |
- reserved_keyword_alias: dict | |
Example: | |
search_key: [{'user_submission_id': "12345678" }, {'user_submission_id': "25902591" }] | |
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})... | |
...since 'data' is a reserved keyword of AWS DynamoDB | |
will return only { 'data': <value> } as a response | |
if projected_attributes: set([]), all attributes are returned | |
# Notes: | |
1. AWS DynamoDB can return maximum 100 items in a single batch read. | |
2. A single operation can retrieve upto 16MB of data. | |
This function always receives the chunked request of 100 items. | |
""" | |
def put_item(self, item: dict) -> None: | |
""" | |
Write the item to Dynamo table. | |
Does not return anything. | |
""" | |
def batch_put_item(batch, item: dict) -> None: | |
""" | |
Add an item to the batch. This will be processed directly. | |
""" | |
def query( | |
self, | |
index: Optional[str], | |
partition_key_condition: ConditionBase, | |
filter_expression: Optional[ConditionBase], | |
limit: Optional[int] = None, | |
additional_params: Optional[Dict[str, Any]] = None, | |
) -> List[Dict[str, Any]]: | |
""" | |
Search for items from DynamoDB table based on the search key. | |
""" | |
def scan( | |
self, filter_expression: ConditionBase, limit: int | |
) -> List[Dict[str, Any]]: | |
"""Scan for items from DynamoDB table based on the search key.""" | |
@implementer(IDynamoDBTable) | |
class DynamoDBTable: | |
""" | |
Interacts with AWS Dynamo DB | |
See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#client | |
There are slight differences in naming compared to MongoDB (e.g. Collection -> Table) | |
For more details, look at Core components section of this article: | |
https://aws.amazon.com/blogs/database/performing-a-live-migration-from-a-mongodb-cluster-to-amazon-dynamodb/ | |
""" | |
def __init__( | |
self, | |
aws_access_key_id, | |
aws_secret_access_key, | |
storage_name, | |
system, | |
local_url=None, | |
): | |
if local_url: | |
self.dynamo = boto3.resource( | |
"dynamodb", | |
endpoint_url=local_url, | |
aws_access_key_id="fakeMyKeyId", | |
aws_secret_access_key="fakeSecretAccessKey", | |
region_name="eu-central-1", | |
) | |
else: | |
max_retries = int(get_settings("bm.dynamo_db_max_retries", 10)) | |
self.dynamo = boto3.resource( | |
"dynamodb", | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name="eu-central-1", | |
# Handing over settings for boto3 retry strategy | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html | |
config=Config( | |
retries={"max_attempts": max_retries, "mode": "adaptive"}, | |
max_pool_connections=1024, | |
), | |
) | |
self.table_name = get_table_name(system, storage_name) | |
self.dynamodb_table = self.dynamo.Table(self.table_name) | |
self.log = get_logger(__name__) | |
def get_batch(self): | |
return self.dynamodb_table.batch_writer() | |
def delete_item(self, primary_key: Dict[str, Any], return_values="None"): | |
""" | |
@see: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.delete_item # noqa E501 | |
Delete an item from the table with its primary key | |
:param primary_key: A map of attribute names to values, representing the primary key of the item to delete. | |
- For a simple primary key, provide only a value for the partition key. | |
e.g {"pk_user_id": 123"} | |
- For a composite primary key, must provide values for both the partition key and the sort key. | |
e.g {"pk_user_id":123, "sk_timestamp": 1611525962429000} | |
:param return_values: 'NONE'|'ALL_OLD' for delete_item | |
""" | |
try: | |
return self.log_retry_attempts( | |
self.dynamodb_table.delete_item( | |
Key=primary_key, ReturnValues=return_values | |
), | |
"delete_item", | |
) | |
except botocore.exceptions.ClientError as error: | |
self.log.error( | |
"Delete Item API", | |
table_name=self.table_name, | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
) | |
raise | |
def get_item( | |
self, search_key: dict, **kwargs: Dict[str, Any] | |
) -> Optional[Dict[str, Any]]: | |
"""Get single item from DynamoDB table based on the search key. | |
kwargs: | |
- projected_attributes: Set[str] | |
- reserved_keyword_alias: dict | |
Example: | |
search_key: {'user_submission_id': "12345678" } | |
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})... | |
...since 'data' is a reserved keyword of AWS DynamoDB | |
will return only { 'data': <value> } as a response | |
if projected_attributes: set([]), all attributes are returned | |
""" | |
projected_attributes = kwargs.get("projected_attributes", set()) | |
reserved_keyword_alias = kwargs.get("reserved_keyword_alias", dict()) | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.get_item | |
res = None | |
try: | |
aws_response = self.dynamodb_table.get_item( | |
Key=search_key, | |
# see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html | |
ConsistentRead=False, # Eventually consistent read, since it is more a WRITE intensive DB | |
ReturnConsumedCapacity="NONE", # No need for details on performance | |
ProjectionExpression=",".join( | |
projected_attributes | |
), # To get only the data we need | |
ExpressionAttributeNames=reserved_keyword_alias, # in case attr. name is a reserved keyword (e.g. data) | |
) | |
res = self.log_retry_attempts(aws_response, "get_item").get("Item") | |
except botocore.exceptions.ClientError as error: | |
self.log.error( | |
"Get Item API", | |
table_name=self.table_name, | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
search_key=search_key, | |
) | |
raise | |
return res | |
def batch_get_item( | |
self, search_keys: List[Dict[str, Any]], **kwargs | |
) -> List[Dict[str, Any]]: | |
"""Get batch of items from DynamoDB table based on the search keys list provided. | |
kwargs: | |
- projected_attributes: Set[str] | |
- reserved_keyword_alias: dict | |
Example: | |
search_key: [{'user_submission_id': "12345678" }, {'user_submission_id': "25902591" }] | |
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})... | |
...since 'data' is a reserved keyword of AWS DynamoDB | |
will return only { 'data': <value> } as a response | |
if projected_attributes: set([]), all attributes are returned | |
# Notes: | |
1. AWS DynamoDB can return maximum 100 items in a single batch read. | |
2. A single operation can retrieve upto 16MB of data. | |
This function always receives the chunked request of 100 items. | |
""" | |
projected_attributes = kwargs.get("projected_attributes", set()) | |
reserved_keyword_alias = kwargs.get("reserved_keyword_alias", dict()) | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.batch_get_item | |
res = [] | |
try: | |
aws_response = self.dynamo.batch_get_item( | |
RequestItems={ | |
self.table_name: { | |
"Keys": search_keys, | |
"ConsistentRead": False, | |
"ProjectionExpression": ",".join(projected_attributes), | |
"ExpressionAttributeNames": reserved_keyword_alias, | |
} | |
}, | |
ReturnConsumedCapacity="NONE", | |
) | |
res = ( | |
self.log_retry_attempts(aws_response, "batch_get_item") | |
.get("Responses") | |
.get(self.table_name) | |
) | |
except botocore.exceptions.ClientError as error: | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services | |
# Following exceptions can never happen as handled by code: | |
# 1. ValidationException: If requesting more than 100 items but already ensured to send request of 100. | |
# 2. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity. | |
self.log.error( | |
"Batch Read API", | |
table_name=self.table_name, | |
user_submission_ids=search_keys, | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
) | |
raise | |
return res | |
def query( | |
self, | |
index: Optional[str], | |
partition_key_condition: ConditionBase, | |
filter_expression: Optional[ConditionBase], | |
limit: Optional[int] = None, | |
additional_params: Optional[Dict[str, Any]] = None, | |
) -> List[Dict[str, any]]: | |
"""Search for items from DynamoDB table based on the search key.""" | |
res = [] | |
try: | |
query_args = { | |
"KeyConditionExpression": partition_key_condition, | |
"ReturnConsumedCapacity": "NONE", | |
**(additional_params or {}), | |
} | |
if index is not None: | |
query_args["IndexName"] = index | |
if limit is not None: | |
query_args["Limit"] = limit | |
if filter_expression is not None: | |
query_args["FilterExpression"] = filter_expression | |
while True: | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query | |
aws_response = self.dynamodb_table.query(**query_args) | |
res.extend(self.log_retry_attempts(aws_response, "query").get("Items")) | |
if "LastEvaluatedKey" in aws_response: | |
query_args["ExclusiveStartKey"] = aws_response["LastEvaluatedKey"] | |
else: | |
# If `LastEvaluatedKey` is empty, then the "last page" of results | |
# has been processed and there is no more data to be retrieved. | |
break | |
except botocore.exceptions.ClientError as error: | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services | |
# Following exceptions can never happen as handled by code: | |
# 1. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity. | |
self.log.error( | |
"Query API", | |
table_name=self.table_name, | |
index=index, | |
partition_key_condition=self.get_condition_for_logger( | |
partition_key_condition | |
), | |
filter_expression=self.get_condition_for_logger(filter_expression), | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
) | |
raise | |
return res | |
def scan( | |
self, filter_expression: ConditionBase, limit: Optional[int] | |
) -> List[Dict[str, Any]]: | |
"""Scan for items from DynamoDB table based on the search key.""" | |
res = [] | |
try: | |
scan_args = { | |
"FilterExpression": filter_expression, | |
"ReturnConsumedCapacity": "NONE", | |
} | |
if limit is not None: | |
scan_args["Limit"] = limit | |
while True: | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan | |
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan | |
aws_response = self.dynamodb_table.scan(**scan_args) | |
res.extend(self.log_retry_attempts(aws_response, "scan").get("Items")) | |
if "LastEvaluatedKey" in aws_response: | |
scan_args["ExclusiveStartKey"] = aws_response["LastEvaluatedKey"] | |
else: | |
# If `LastEvaluatedKey` is empty, then the "last page" of results | |
# has been processed and there is no more data to be retrieved. | |
break | |
except botocore.exceptions.ClientError as error: | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services | |
# Following exceptions can never happen as handled by code: | |
# 1. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity. | |
self.log.error( | |
"Scan API", | |
table_name=self.table_name, | |
filter_expression=self.get_condition_for_logger(filter_expression), | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
) | |
raise | |
return res | |
def put_item(self, item: dict) -> None: | |
"""Write the item to Dynamo table. | |
Does not return anything. | |
""" | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.put_item | |
try: | |
self.log_retry_attempts(self.dynamodb_table.put_item(Item=item), "put_item") | |
except botocore.exceptions.ClientError as error: | |
self.log.error( | |
"Put Item API", | |
table_name=self.table_name, | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
event_type=item.get("event_type"), | |
event_userid=item.get("user_id"), | |
) | |
raise | |
@staticmethod | |
def batch_put_item(batch, item: dict) -> None: | |
""" | |
Add an item to the batch. This will be processed directly. | |
""" | |
# To know about parameters, see | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/dynamodb.html#batch-writing | |
try: | |
batch.put_item(Item=item) | |
except botocore.exceptions.ClientError: | |
raise | |
def batch_write_items(self, items: List[Dict[str, Any]]) -> None: | |
""" | |
Write a single batch of items on the table | |
https://github.com/bettermarks/bm-backend/pull/581 | |
""" | |
try: | |
aws_response = self.dynamo.batch_write_item( | |
RequestItems={ | |
self.table_name: [{"PutRequest": {"Item": item}} for item in items] | |
} | |
) | |
self.log_retry_attempts(aws_response, "batch_write_items") | |
except botocore.exceptions.ClientError as error: | |
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services | |
# Following exceptions can never happen as handled by code: | |
# 1. ValidationException: If requesting more than 25 items but already ensured to send request of 25. | |
# 2. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity. | |
self.log.error( | |
"Batch Write API", | |
table_name=self.table_name, | |
code=error.response["Error"]["Code"], | |
message=error.response["Error"]["Message"], | |
event_types=[item.get("event_type") for item in items], | |
event_userid=[item.get("user_id") for item in items], | |
) | |
raise | |
def get_condition_for_logger(self, condition): | |
if condition is not None: | |
expression = ConditionExpressionBuilder().build_expression(condition) | |
condition_expression = expression.condition_expression | |
sorted_name_placeholders = sorted( | |
expression.attribute_name_placeholders.items() | |
) | |
sorted_name_placeholders.reverse() | |
for k, v in sorted_name_placeholders: | |
# Space in the end is intentional to avoid replacing #n1 in #n10 | |
replace_key = f"{k} " | |
replace_value = f"'{v}' " | |
condition_expression = condition_expression.replace( | |
replace_key, replace_value | |
) | |
sorted_value_placeholders = sorted( | |
expression.attribute_value_placeholders.items() | |
) | |
sorted_value_placeholders.reverse() | |
for k, v in sorted_value_placeholders: | |
# Space in the front is intentional to avoid replacing :v1 in :v10 | |
replace_key = f" {k}" | |
replace_value = f" '{v}'" | |
condition_expression = condition_expression.replace( | |
replace_key, replace_value | |
) | |
return condition_expression | |
else: | |
return None | |
def log_retry_attempts(self, res, operation: str): | |
retry_count = res.get("ResponseMetadata", {}).get("RetryAttempts", 0) | |
if retry_count > 0: | |
self.log.info( | |
"Retry for DynamoDb", | |
table_name=self.table_name, | |
retry_count=retry_count, | |
operation=operation, | |
) | |
return res |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment