|
import os |
|
import boto3 |
|
import traceback |
|
|
|
s3_client = boto3.client('s3') |
|
dynamodb = boto3.resource('dynamodb') |
|
table_name = 'dynamodb-table' |
|
|
|
def import_file(bucket, key): |
|
filename, file_extension = os.path.splitext(key) |
|
s3_client.copy_object(Bucket='temp-bucket', Key='{}{}'.format(str(filename), file_extension), CopySource={'Bucket': bucket, 'Key': key}) |
|
|
|
return (filename, file_extension) |
|
|
|
def lambda_handler(event, context): |
|
table = dynamodb.Table(table_name) |
|
|
|
for record in event['Records']: |
|
bucket = record['s3']['bucket']['name'] |
|
key = record['s3']['object']['key'] |
|
|
|
filename, file_extension = import_file(bucket, key) |
|
|
|
try: |
|
response = table.get_item(Key={'filename': filename}) |
|
item = response['Item'] if "Item" in response else None |
|
|
|
if item is None: |
|
# The item doesn't exist in the DynamoDB table, so this is the first file of the pair. |
|
# We need to create a new record and move on. |
|
item = {'filename': filename, 'status': 'loading'} |
|
table.put_item(TableName=table_name, Item=item) |
|
else: |
|
# The item does exist in the DynamoDB table, so this is the second file of the pair. |
|
# We need to update the status of the record and invoke the processing step. |
|
response = table.update_item( |
|
Key={'filename': filename}, |
|
UpdateExpression="set #s = :s", |
|
ExpressionAttributeNames={ |
|
"#s": "status" |
|
}, |
|
ExpressionAttributeValues={ |
|
':s': 'ready' |
|
}, |
|
ReturnValues="UPDATED_NEW") |
|
|
|
# Here we invoke the processing step passing the filename as an argument |
|
# invoke_processing_step(filename) |
|
|
|
except Exception as e: |
|
print(traceback.format_exc()) |
|
|
|
return (bucket, key) |