Skip to content

Instantly share code, notes, and snippets.

@rmitula
Created July 28, 2023 07:27
Show Gist options
  • Save rmitula/96d7e2a2670d0974a758bf6ca9150899 to your computer and use it in GitHub Desktop.
Save rmitula/96d7e2a2670d0974a758bf6ca9150899 to your computer and use it in GitHub Desktop.
Listing 6. Sample Python script in the AWS Glue Job that utilizes Apache Spark to run an Apache Iceberg procedure, creating a changelog table on Amazon S3 and updating the products_changelog table in the AWS Glue Data Catalog
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Initialize Spark and Glue context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Retrieve and initialize job parameters
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Define catalog, schema, and table name
database_name = "apache_iceberg_showcase"
table_name = "products"
# Define Change Data Capture settings
changelog_table_name = "products_changelog"
identifier_columns = "product_id"
start_snapshot_id = "8355263591683472575" # Value based on example
end_snapshot_id = "4192901519627873695" # Value based on example
spark.sql(f"""
CALL glue_catalog.system.create_changelog_view(
table => '{database_name}.{table_name}',
options => map(
'start-snapshot-id', '{start_snapshot_id}',
'end-snapshot-id', '{end_snapshot_id}'
),
changelog_view => '{changelog_table_name}',
compute_updates => true,
identifier_columns => array('{identifier_columns}')
)
""")
changelog_df = spark.sql(f"SELECT * FROM {changelog_table_name}")
changelog_df.write \
.option("path", f"s3:///curated-zone/{changelog_table_name}") \
.mode("append") \
.saveAsTable(f"{database_name}.{changelog_table_name}")
job.commit()
@mcanerim
Copy link

When executing this I'm getting weird errors. I know that Glue 4.0 supports only Iceberg 1.0.0, but the create_changelog_view procedure is available after Iceberg 1.2.1.

Could you maybe share details on how you made it work?

@rmitula
Copy link
Author

rmitula commented Mar 13, 2024

When executing this I'm getting weird errors. I know that Glue 4.0 supports only Iceberg 1.0.0, but the create_changelog_view procedure is available after Iceberg 1.2.1.

Could you maybe share details on how you made it work?

You need to upload your own version of Apache Iceberg and use it. It's documented here under "Using a different Iceberg version".

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html

@mcanerim
Copy link

Yes I did that just now, but now I'm receiving the following any idea why that might be?

software.amazon.awssdk.core.exception.RetryableException: Data read has a different checksum than expected. Was 0x7e5292cafa14ea4e39e9ab27f6196005, but expected 0x00000000000000000000000000000000. This commonly means that the data was corrupted between the client and service.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment