Last active
June 30, 2024 10:00
-
-
Save martinapugliese/cae86eb68f5aab59e87332725935fd5f to your computer and use it in GitHub Desktop.
Some wrapper methods to deal with DynamoDB databases in Python, using boto3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2016 Martina Pugliese | |
from boto3 import resource | |
from boto3.dynamodb.conditions import Key | |
# The boto3 dynamoDB resource | |
dynamodb_resource = resource('dynamodb') | |
def get_table_metadata(table_name): | |
""" | |
Get some metadata about chosen table. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
return { | |
'num_items': table.item_count, | |
'primary_key_name': table.key_schema[0], | |
'status': table.table_status, | |
'bytes_size': table.table_size_bytes, | |
'global_secondary_indices': table.global_secondary_indexes | |
} | |
def read_table_item(table_name, pk_name, pk_value): | |
""" | |
Return item read by primary key. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.get_item(Key={pk_name: pk_value}) | |
return response | |
def add_item(table_name, col_dict): | |
""" | |
Add one item (row) to table. col_dict is a dictionary {col_name: value}. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.put_item(Item=col_dict) | |
return response | |
def delete_item(table_name, pk_name, pk_value): | |
""" | |
Delete an item (row) in table from its primary key. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.delete_item(Key={pk_name: pk_value}) | |
return | |
def scan_table_firstpage(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets only first page of results in pagination. Returns the response. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.scan(FilterExpression=filtering_exp) | |
else: | |
response = table.scan() | |
return response | |
def scan_table_allpages(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets all pages of results. | |
Returns list of items. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.scan(FilterExpression=filtering_exp) | |
else: | |
response = table.scan() | |
items = response['Items'] | |
while True: | |
print len(response['Items']) | |
if response.get('LastEvaluatedKey'): | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], FilterExpression=filtering_exp) | |
items += response['Items'] | |
else: | |
break | |
return items | |
def query_table(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a query operation on the table. Can specify filter_key (col name) and its value to be filtered. Returns the response. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.query(KeyConditionExpression=filtering_exp) | |
else: | |
response = table.query() | |
return response |
Critical error in 87 line.
Next page going without filtering
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
Works fine this code:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],FilterExpression=filtering_exp)
Thanks for this wrapper. Very helpful!
MikeUdin is correct, line 87 needs modification. You need to add the FilterExpression
Just another method I thought is missing, update item:
def update_item(self, table_name, pk_name, pk_value, col_dict):
"""
update one item (row) to table. col_dict is a dictionary {col_name: value}.
"""
update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in col_dict))
expression_attribute_values = {f':{k}': v for k, v in col_dict.items()}
expression_attribute_names = {f'#{k}': k for k in col_dict}
table = dynamodb.Table(table_name)
response = table.update_item(
Key={'{}'.format(pk_name): pk_value},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues='UPDATED_NEW',
)
return response
Thank you very much! This helped me a lot at understanding how query
works!
Thanks for the wrapper!
Thanks for the wrapper!
No worries, glad this is still useful after all this time, wouldn't have thought
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A simple, clear explanation. Thank you!