Last active
June 30, 2024 10:00
-
-
Save martinapugliese/cae86eb68f5aab59e87332725935fd5f to your computer and use it in GitHub Desktop.
Some wrapper methods to deal with DynamoDB databases in Python, using boto3.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (C) 2016 Martina Pugliese | |
from boto3 import resource | |
from boto3.dynamodb.conditions import Key | |
# The boto3 dynamoDB resource | |
dynamodb_resource = resource('dynamodb') | |
def get_table_metadata(table_name): | |
""" | |
Get some metadata about chosen table. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
return { | |
'num_items': table.item_count, | |
'primary_key_name': table.key_schema[0], | |
'status': table.table_status, | |
'bytes_size': table.table_size_bytes, | |
'global_secondary_indices': table.global_secondary_indexes | |
} | |
def read_table_item(table_name, pk_name, pk_value): | |
""" | |
Return item read by primary key. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.get_item(Key={pk_name: pk_value}) | |
return response | |
def add_item(table_name, col_dict): | |
""" | |
Add one item (row) to table. col_dict is a dictionary {col_name: value}. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.put_item(Item=col_dict) | |
return response | |
def delete_item(table_name, pk_name, pk_value): | |
""" | |
Delete an item (row) in table from its primary key. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
response = table.delete_item(Key={pk_name: pk_value}) | |
return | |
def scan_table_firstpage(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets only first page of results in pagination. Returns the response. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.scan(FilterExpression=filtering_exp) | |
else: | |
response = table.scan() | |
return response | |
def scan_table_allpages(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a scan operation on table. Can specify filter_key (col name) and its value to be filtered. This gets all pages of results. | |
Returns list of items. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.scan(FilterExpression=filtering_exp) | |
else: | |
response = table.scan() | |
items = response['Items'] | |
while True: | |
print len(response['Items']) | |
if response.get('LastEvaluatedKey'): | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], FilterExpression=filtering_exp) | |
items += response['Items'] | |
else: | |
break | |
return items | |
def query_table(table_name, filter_key=None, filter_value=None): | |
""" | |
Perform a query operation on the table. Can specify filter_key (col name) and its value to be filtered. Returns the response. | |
""" | |
table = dynamodb_resource.Table(table_name) | |
if filter_key and filter_value: | |
filtering_exp = Key(filter_key).eq(filter_value) | |
response = table.query(KeyConditionExpression=filtering_exp) | |
else: | |
response = table.query() | |
return response |
A simple, clear explanation. Thank you!
Critical error in 87 line.
Next page going without filtering
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
Works fine this code:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'],FilterExpression=filtering_exp)
Thanks for this wrapper. Very helpful!
MikeUdin is correct, line 87 needs modification. You need to add the FilterExpression
Just another method I thought is missing, update item:
def update_item(self, table_name, pk_name, pk_value, col_dict):
"""
update one item (row) to table. col_dict is a dictionary {col_name: value}.
"""
update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in col_dict))
expression_attribute_values = {f':{k}': v for k, v in col_dict.items()}
expression_attribute_names = {f'#{k}': k for k in col_dict}
table = dynamodb.Table(table_name)
response = table.update_item(
Key={'{}'.format(pk_name): pk_value},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues='UPDATED_NEW',
)
return response
Thank you very much! This helped me a lot at understanding how query
works!
Thanks for the wrapper!
Thanks for the wrapper!
No worries, glad this is still useful after all this time, wouldn't have thought
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello Martin ,
Need one help one the dynamo db with python lambda ,i am writing a python lambda function which gets the result from dynamo db, below is my patient table dynamo db structure .
{
"patient_id": 161,
"patientVisits": [
{
"Diagnosis": [
{
"Diagnosis_Completion": "Definitive",
"DiagnosisId": 100,
"ICD_Code": "test ICd code for dignosis Id 100"
}
],
"Follow_Up_Recommendation": "Follow up recommendation Test",
"visitId": 123
},
{
"Diagnosis": [
{
"Diagnosis_Completion": "Definitive",
"DiagnosisId": 100,
"ICD_Code": "test ICd code for dignosis Id 100"
}
],
"visitId": 124
}]}
Here patient table has key as patient_id and its has many attributes , one of the attribute is List of patientVisits which again has visitId and also includes the List of Diagnosis, my requirement is it to get the Diagnosis based on patientId and visitId, so i have tried to write the python lambda is like this
def list(event, context):
table = dynamodb.Table(os.environ['PATIENT_TABLE'])
patientId = 130
visitId = 124
# fetch all patientVisits from the database
result = table.query(
ProjectionExpression="#patientId, #pv.Diagnosis",
FilterExpression = '#pv.visitId = :visit_id',
ExpressionAttributeNames={ "#patientId": "patientId", "#pv" : "patientVisits"}, # Expression Attribute Names for Projection Expression only.
ExpressionAttributeValues= {":visit_id": visitId},
KeyConditionExpression=Key('patient_id').eq(patientId)
)
the above is giving me zero results, Can you please help me on this , if i change ProjectionExpression #pv.Diagnosis to #pv[0].Diagnosis aswell the FilterExpression then this works only gives me the zeroth index and also if the visitId matches with the zeroth index visitID, but i want to get the Diagnosis list from the patientVisits which is based on the visitId and patientId, can you please help me on this . struggling from the long time
Thanks you in advance!!!