Last active
September 30, 2024 18:23
-
-
Save shamilnabiyev/d69bfb3c167af685cedeaec6819515de to your computer and use it in GitHub Desktop.
Read files marked as deleted from an AWS S3 bucket into a DynamicFrame using boto3 and AWS Glue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from awsglue.context import GlueContext | |
from pyspark.context import SparkContext | |
from awsglue.dynamicframe import DynamicFrame | |
def read_deleted_files_to_dynamicframe(bucket_name, prefix=''): | |
# Initialize Glue context | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
# Initialize S3 client | |
s3 = boto3.client('s3') | |
# List object versions including delete markers | |
paginator = s3.get_paginator('list_object_versions') | |
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix) | |
# Collect keys of deleted files | |
deleted_files = [] | |
for page in page_iterator: | |
if 'DeleteMarkers' in page: | |
for delete_marker in page['DeleteMarkers']: | |
if delete_marker['IsLatest']: | |
deleted_files.append(delete_marker['Key']) | |
# Read deleted files into a DynamicFrame | |
if deleted_files: | |
# Create a DynamicFrame from the deleted files | |
dyf = glueContext.create_dynamic_frame.from_options( | |
connection_type="s3", | |
connection_options={ | |
"paths": [f"s3://{bucket_name}/{key}" for key in deleted_files], | |
"recurse": True, | |
"useS3ListImplementation": True, | |
"readVersion": "LATEST_PREVIOUS" # Read the version before the delete marker | |
}, | |
format="csv", # Adjust this based on your file format | |
format_options={ | |
"withHeader": True, | |
"separator": "," | |
} | |
) | |
return dyf | |
else: | |
print("No deleted files found.") | |
return None | |
# Usage | |
bucket_name = 'your-bucket-name' | |
prefix = 'your-folder-prefix/' # Optional | |
dyf = read_deleted_files_to_dynamicframe(bucket_name, prefix) | |
if dyf: | |
# Print the schema of the DynamicFrame | |
dyf.printSchema() | |
# Convert to DataFrame for further processing if needed | |
df = dyf.toDF() | |
# Perform your manipulations here | |
# Convert back to DynamicFrame if necessary | |
result_dyf = DynamicFrame.fromDF(df, glueContext, "result_dyf") | |
# Write the result back to S3 | |
glueContext.write_dynamic_frame.from_options( | |
frame=result_dyf, | |
connection_type="s3", | |
connection_options={"path": "s3://output-bucket/output-path/"}, | |
format="parquet" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment