Skip to content

Instantly share code, notes, and snippets.

@rehanvdm
Created March 27, 2024 03:18
Show Gist options
  • Save rehanvdm/ac362a370acd78a9a7382caabbfa914a to your computer and use it in GitHub Desktop.
Save rehanvdm/ac362a370acd78a9a7382caabbfa914a to your computer and use it in GitHub Desktop.
AWS PTA Meetup: Automate your Event Driven data lake/warehouse ingestion - Glue Script
# The meetup video https://youtu.be/lbbevQPK3D8
import datetime
import sys
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col
args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_location", "output_location"]) # the input and output locations are passed as arguments
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Adding partitions for partitioned output
now = datetime.datetime.now()
year = "year=" + str(now.year)
month = "month=" + str(now.month)
day = "day=" + str(now.day)
# The S3 location to read input files
dyf = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={
"paths": [args["input_location"]],
"recurse": True,
},
transformation_ctx="dyf",
)
# Convert DynamicFrame to DataFrame
df = dyf.toDF()
# Cast all columns to string using a single select statement
df = df.select([col(c).cast("string").alias(c) for c in df.columns])
# Convert DataFrame back to DynamicFrame
string_dyf = DynamicFrame.fromDF(df, glueContext, "string_dyf")
# The S3 location to write output files
output = args["output_location"]
currdate = output + "/" + year + "/" + month + "/" + day
# Writing out the transformed file
output_dyf = glueContext.write_dynamic_frame.from_options(
frame=string_dyf,
connection_type="s3",
format="glueparquet",
connection_options={
"path": currdate,
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="output_dyf",
)
job.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment