Skip to content

Instantly share code, notes, and snippets.

@saurabh2590
Created September 28, 2021 13:57
Show Gist options
  • Save saurabh2590/c5f9c70b2845ad972217ecbb238f3af9 to your computer and use it in GitHub Desktop.
Save saurabh2590/c5f9c70b2845ad972217ecbb238f3af9 to your computer and use it in GitHub Desktop.
DynamoDB Client
from json import dumps
from typing import List, Optional, Dict, Any
import logging
import boto3
import botocore
from boto3.dynamodb.conditions import ConditionBase, ConditionExpressionBuilder
from botocore.config import Config
from structlog import get_logger
from zope.interface import Interface, implementer
from bm.settings import get_settings
from bm.storage.aws.helpers import get_table_name
logging.getLogger("urllib3.connection").setLevel(logging.WARNING)
logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING)
logging.getLogger("urllib3.poolmanager").setLevel(logging.WARNING)
class DynamoDBException(Exception):
"""Raises when something went wrong on the interface
with AWS DynamoDB service
"""
class IDynamoDBTable(Interface):
def get_batch(self):
"""
Get batch writer for the dynmamoDB table
:return:
"""
def get_item(
self, search_key: str, kwargs: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Get single item from DynamoDB table based on the search key.
kwargs:
- projected_attributes: Set[str]
- reserved_keyword_alias: dict
Example:
search_key: {'user_submission_id': "12345678" }
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})...
...since 'data' is a reserved keyword of AWS DynamoDB
will retrun only { 'data': <value> } as a response
if projected_attributes: set([]), all attributes are returned
"""
def batch_get_item(
self, search_keys: List[Dict[str, Any]], kwargs
) -> List[Dict[str, Any]]:
"""Get batch of items from DynamoDB table based on the search keys list provided.
kwargs:
- projected_attributes: Set[str]
- reserved_keyword_alias: dict
Example:
search_key: [{'user_submission_id': "12345678" }, {'user_submission_id': "25902591" }]
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})...
...since 'data' is a reserved keyword of AWS DynamoDB
will return only { 'data': <value> } as a response
if projected_attributes: set([]), all attributes are returned
# Notes:
1. AWS DynamoDB can return maximum 100 items in a single batch read.
2. A single operation can retrieve upto 16MB of data.
This function always receives the chunked request of 100 items.
"""
def put_item(self, item: dict) -> None:
"""
Write the item to Dynamo table.
Does not return anything.
"""
def batch_put_item(batch, item: dict) -> None:
"""
Add an item to the batch. This will be processed directly.
"""
def query(
self,
index: Optional[str],
partition_key_condition: ConditionBase,
filter_expression: Optional[ConditionBase],
limit: Optional[int] = None,
additional_params: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""
Search for items from DynamoDB table based on the search key.
"""
def scan(
self, filter_expression: ConditionBase, limit: int
) -> List[Dict[str, Any]]:
"""Scan for items from DynamoDB table based on the search key."""
@implementer(IDynamoDBTable)
class DynamoDBTable:
"""
Interacts with AWS Dynamo DB
See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#client
There are slight differences in naming compared to MongoDB (e.g. Collection -> Table)
For more details, look at Core components section of this article:
https://aws.amazon.com/blogs/database/performing-a-live-migration-from-a-mongodb-cluster-to-amazon-dynamodb/
"""
def __init__(
self,
aws_access_key_id,
aws_secret_access_key,
storage_name,
system,
local_url=None,
):
if local_url:
self.dynamo = boto3.resource(
"dynamodb",
endpoint_url=local_url,
aws_access_key_id="fakeMyKeyId",
aws_secret_access_key="fakeSecretAccessKey",
region_name="eu-central-1",
)
else:
max_retries = int(get_settings("bm.dynamo_db_max_retries", 10))
self.dynamo = boto3.resource(
"dynamodb",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="eu-central-1",
# Handing over settings for boto3 retry strategy
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
config=Config(
retries={"max_attempts": max_retries, "mode": "adaptive"},
max_pool_connections=1024,
),
)
self.table_name = get_table_name(system, storage_name)
self.dynamodb_table = self.dynamo.Table(self.table_name)
self.log = get_logger(__name__)
def get_batch(self):
return self.dynamodb_table.batch_writer()
def delete_item(self, primary_key: Dict[str, Any], return_values="None"):
"""
@see: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.delete_item # noqa E501
Delete an item from the table with its primary key
:param primary_key: A map of attribute names to values, representing the primary key of the item to delete.
- For a simple primary key, provide only a value for the partition key.
e.g {"pk_user_id": 123"}
- For a composite primary key, must provide values for both the partition key and the sort key.
e.g {"pk_user_id":123, "sk_timestamp": 1611525962429000}
:param return_values: 'NONE'|'ALL_OLD' for delete_item
"""
try:
return self.log_retry_attempts(
self.dynamodb_table.delete_item(
Key=primary_key, ReturnValues=return_values
),
"delete_item",
)
except botocore.exceptions.ClientError as error:
self.log.error(
"Delete Item API",
table_name=self.table_name,
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
)
raise
def get_item(
self, search_key: dict, **kwargs: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Get single item from DynamoDB table based on the search key.
kwargs:
- projected_attributes: Set[str]
- reserved_keyword_alias: dict
Example:
search_key: {'user_submission_id': "12345678" }
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})...
...since 'data' is a reserved keyword of AWS DynamoDB
will return only { 'data': <value> } as a response
if projected_attributes: set([]), all attributes are returned
"""
projected_attributes = kwargs.get("projected_attributes", set())
reserved_keyword_alias = kwargs.get("reserved_keyword_alias", dict())
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.get_item
res = None
try:
aws_response = self.dynamodb_table.get_item(
Key=search_key,
# see https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html
ConsistentRead=False, # Eventually consistent read, since it is more a WRITE intensive DB
ReturnConsumedCapacity="NONE", # No need for details on performance
ProjectionExpression=",".join(
projected_attributes
), # To get only the data we need
ExpressionAttributeNames=reserved_keyword_alias, # in case attr. name is a reserved keyword (e.g. data)
)
res = self.log_retry_attempts(aws_response, "get_item").get("Item")
except botocore.exceptions.ClientError as error:
self.log.error(
"Get Item API",
table_name=self.table_name,
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
search_key=search_key,
)
raise
return res
def batch_get_item(
self, search_keys: List[Dict[str, Any]], **kwargs
) -> List[Dict[str, Any]]:
"""Get batch of items from DynamoDB table based on the search keys list provided.
kwargs:
- projected_attributes: Set[str]
- reserved_keyword_alias: dict
Example:
search_key: [{'user_submission_id': "12345678" }, {'user_submission_id': "25902591" }]
projected_attributes: set(['#d']) with reserved_keyword_alias: dict({'#d': 'data'})...
...since 'data' is a reserved keyword of AWS DynamoDB
will return only { 'data': <value> } as a response
if projected_attributes: set([]), all attributes are returned
# Notes:
1. AWS DynamoDB can return maximum 100 items in a single batch read.
2. A single operation can retrieve upto 16MB of data.
This function always receives the chunked request of 100 items.
"""
projected_attributes = kwargs.get("projected_attributes", set())
reserved_keyword_alias = kwargs.get("reserved_keyword_alias", dict())
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.batch_get_item
res = []
try:
aws_response = self.dynamo.batch_get_item(
RequestItems={
self.table_name: {
"Keys": search_keys,
"ConsistentRead": False,
"ProjectionExpression": ",".join(projected_attributes),
"ExpressionAttributeNames": reserved_keyword_alias,
}
},
ReturnConsumedCapacity="NONE",
)
res = (
self.log_retry_attempts(aws_response, "batch_get_item")
.get("Responses")
.get(self.table_name)
)
except botocore.exceptions.ClientError as error:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services
# Following exceptions can never happen as handled by code:
# 1. ValidationException: If requesting more than 100 items but already ensured to send request of 100.
# 2. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity.
self.log.error(
"Batch Read API",
table_name=self.table_name,
user_submission_ids=search_keys,
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
)
raise
return res
def query(
self,
index: Optional[str],
partition_key_condition: ConditionBase,
filter_expression: Optional[ConditionBase],
limit: Optional[int] = None,
additional_params: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, any]]:
"""Search for items from DynamoDB table based on the search key."""
res = []
try:
query_args = {
"KeyConditionExpression": partition_key_condition,
"ReturnConsumedCapacity": "NONE",
**(additional_params or {}),
}
if index is not None:
query_args["IndexName"] = index
if limit is not None:
query_args["Limit"] = limit
if filter_expression is not None:
query_args["FilterExpression"] = filter_expression
while True:
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
aws_response = self.dynamodb_table.query(**query_args)
res.extend(self.log_retry_attempts(aws_response, "query").get("Items"))
if "LastEvaluatedKey" in aws_response:
query_args["ExclusiveStartKey"] = aws_response["LastEvaluatedKey"]
else:
# If `LastEvaluatedKey` is empty, then the "last page" of results
# has been processed and there is no more data to be retrieved.
break
except botocore.exceptions.ClientError as error:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services
# Following exceptions can never happen as handled by code:
# 1. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity.
self.log.error(
"Query API",
table_name=self.table_name,
index=index,
partition_key_condition=self.get_condition_for_logger(
partition_key_condition
),
filter_expression=self.get_condition_for_logger(filter_expression),
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
)
raise
return res
def scan(
self, filter_expression: ConditionBase, limit: Optional[int]
) -> List[Dict[str, Any]]:
"""Scan for items from DynamoDB table based on the search key."""
res = []
try:
scan_args = {
"FilterExpression": filter_expression,
"ReturnConsumedCapacity": "NONE",
}
if limit is not None:
scan_args["Limit"] = limit
while True:
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
# https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
aws_response = self.dynamodb_table.scan(**scan_args)
res.extend(self.log_retry_attempts(aws_response, "scan").get("Items"))
if "LastEvaluatedKey" in aws_response:
scan_args["ExclusiveStartKey"] = aws_response["LastEvaluatedKey"]
else:
# If `LastEvaluatedKey` is empty, then the "last page" of results
# has been processed and there is no more data to be retrieved.
break
except botocore.exceptions.ClientError as error:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services
# Following exceptions can never happen as handled by code:
# 1. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity.
self.log.error(
"Scan API",
table_name=self.table_name,
filter_expression=self.get_condition_for_logger(filter_expression),
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
)
raise
return res
def put_item(self, item: dict) -> None:
"""Write the item to Dynamo table.
Does not return anything.
"""
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.put_item
try:
self.log_retry_attempts(self.dynamodb_table.put_item(Item=item), "put_item")
except botocore.exceptions.ClientError as error:
self.log.error(
"Put Item API",
table_name=self.table_name,
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
event_type=item.get("event_type"),
event_userid=item.get("user_id"),
)
raise
@staticmethod
def batch_put_item(batch, item: dict) -> None:
"""
Add an item to the batch. This will be processed directly.
"""
# To know about parameters, see
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/dynamodb.html#batch-writing
try:
batch.put_item(Item=item)
except botocore.exceptions.ClientError:
raise
def batch_write_items(self, items: List[Dict[str, Any]]) -> None:
"""
Write a single batch of items on the table
https://github.com/bettermarks/bm-backend/pull/581
"""
try:
aws_response = self.dynamo.batch_write_item(
RequestItems={
self.table_name: [{"PutRequest": {"Item": item}} for item in items]
}
)
self.log_retry_attempts(aws_response, "batch_write_items")
except botocore.exceptions.ClientError as error:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html#parsing-error-responses-and-catching-exceptions-from-aws-services
# Following exceptions can never happen as handled by code:
# 1. ValidationException: If requesting more than 25 items but already ensured to send request of 25.
# 2. ProvisionedThroughputExceededException: If request cannot be processed due to insufficient capacity.
self.log.error(
"Batch Write API",
table_name=self.table_name,
code=error.response["Error"]["Code"],
message=error.response["Error"]["Message"],
event_types=[item.get("event_type") for item in items],
event_userid=[item.get("user_id") for item in items],
)
raise
def get_condition_for_logger(self, condition):
if condition is not None:
expression = ConditionExpressionBuilder().build_expression(condition)
condition_expression = expression.condition_expression
sorted_name_placeholders = sorted(
expression.attribute_name_placeholders.items()
)
sorted_name_placeholders.reverse()
for k, v in sorted_name_placeholders:
# Space in the end is intentional to avoid replacing #n1 in #n10
replace_key = f"{k} "
replace_value = f"'{v}' "
condition_expression = condition_expression.replace(
replace_key, replace_value
)
sorted_value_placeholders = sorted(
expression.attribute_value_placeholders.items()
)
sorted_value_placeholders.reverse()
for k, v in sorted_value_placeholders:
# Space in the front is intentional to avoid replacing :v1 in :v10
replace_key = f" {k}"
replace_value = f" '{v}'"
condition_expression = condition_expression.replace(
replace_key, replace_value
)
return condition_expression
else:
return None
def log_retry_attempts(self, res, operation: str):
retry_count = res.get("ResponseMetadata", {}).get("RetryAttempts", 0)
if retry_count > 0:
self.log.info(
"Retry for DynamoDb",
table_name=self.table_name,
retry_count=retry_count,
operation=operation,
)
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment