Skip to content

Instantly share code, notes, and snippets.

@rmitula
Created July 28, 2023 07:23
Show Gist options
  • Save rmitula/6415bc463a869b9bb2cc6c13fe2a7ee2 to your computer and use it in GitHub Desktop.
Save rmitula/6415bc463a869b9bb2cc6c13fe2a7ee2 to your computer and use it in GitHub Desktop.
Listing 3. Sample Python script in AWS Glue Job leverages Apache Spark to transform JSON data from the Raw Data Zone into Apache Iceberg format in the Curated Data Zone, simultaneously updating the AWS Glue Data Catalog
import sys
import boto3
from pyspark.sql.functions import concat_ws, lpad
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Initialize Spark and Glue context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Retrieve and initialize job parameters
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Function to check if a given table exists in the database
def table_exist(database, table_name):
client = boto3.client("glue")
try:
response = client.get_table(DatabaseName=database, Name=table_name)
return True
except client.exceptions.EntityNotFoundException:
return False
# Define catalog, schema, and table name
catalog_name = "glue_catalog"
database_name = "apache_iceberg_showcase"
table_name = "products"
# Load data from S3 into a DataFrame
df = spark.read.format("json").load("s3:///raw-zone/products/year=2023/month=05/day=01/")
# Print the schema of the dataframe
df.printSchema()
# Check if the table exists
if table_exist(database_name, table_name):
# Create a temporary view for the dataframe
temporary_view = "TempView"
df.createOrReplaceTempView(temporary_view)
# Perform the UPSERT operation using SQL statement
spark.sql(f"""
MERGE INTO {catalog_name}.{database_name}.{table_name} AS target
USING {temporary_view} AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN
UPDATE SET
target.name = source.name,
target.category = source.category,
target.variants = source.variants,
target.specifications = source.specifications,
target.ratings = source.ratings,
target.price = source.price
WHEN NOT MATCHED THEN
INSERT *
""")
else:
# If the table doesn't exist, create the table and perform the INSERT operation
df.writeTo(f"{catalog_name}.{database_name}.{table_name}") \
.tableProperty("format-version", "2") \
.tableProperty("location", "s3:///curated-zone/products") \
.create()
# Commit the job
job.commit()
@nickatnight
Copy link

Hi thanks for the script. Question, why do you use boto3 instead of spark context? I prefer using the spark, but using spark.catalog.tablesExists never recognizes my iceberg tables 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment