Skip to content

Instantly share code, notes, and snippets.

@shamilnabiyev
Last active September 30, 2024 18:23
Show Gist options
  • Save shamilnabiyev/d69bfb3c167af685cedeaec6819515de to your computer and use it in GitHub Desktop.
Save shamilnabiyev/d69bfb3c167af685cedeaec6819515de to your computer and use it in GitHub Desktop.
Read files marked as deleted from an AWS S3 bucket into a DynamicFrame using boto3 and AWS Glue
import boto3
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
def read_deleted_files_to_dynamicframe(bucket_name, prefix=''):
# Initialize Glue context
sc = SparkContext()
glueContext = GlueContext(sc)
# Initialize S3 client
s3 = boto3.client('s3')
# List object versions including delete markers
paginator = s3.get_paginator('list_object_versions')
page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
# Collect keys of deleted files
deleted_files = []
for page in page_iterator:
if 'DeleteMarkers' in page:
for delete_marker in page['DeleteMarkers']:
if delete_marker['IsLatest']:
deleted_files.append(delete_marker['Key'])
# Read deleted files into a DynamicFrame
if deleted_files:
# Create a DynamicFrame from the deleted files
dyf = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={
"paths": [f"s3://{bucket_name}/{key}" for key in deleted_files],
"recurse": True,
"useS3ListImplementation": True,
"readVersion": "LATEST_PREVIOUS" # Read the version before the delete marker
},
format="csv", # Adjust this based on your file format
format_options={
"withHeader": True,
"separator": ","
}
)
return dyf
else:
print("No deleted files found.")
return None
# Usage
bucket_name = 'your-bucket-name'
prefix = 'your-folder-prefix/' # Optional
dyf = read_deleted_files_to_dynamicframe(bucket_name, prefix)
if dyf:
# Print the schema of the DynamicFrame
dyf.printSchema()
# Convert to DataFrame for further processing if needed
df = dyf.toDF()
# Perform your manipulations here
# Convert back to DynamicFrame if necessary
result_dyf = DynamicFrame.fromDF(df, glueContext, "result_dyf")
# Write the result back to S3
glueContext.write_dynamic_frame.from_options(
frame=result_dyf,
connection_type="s3",
connection_options={"path": "s3://output-bucket/output-path/"},
format="parquet"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment