Created
July 28, 2023 07:27
-
-
Save rmitula/96d7e2a2670d0974a758bf6ca9150899 to your computer and use it in GitHub Desktop.
Listing 6. Sample Python script in the AWS Glue Job that utilizes Apache Spark to run an Apache Iceberg procedure, creating a changelog table on Amazon S3 and updating the products_changelog table in the AWS Glue Data Catalog
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from awsglue.transforms import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.context import SparkContext | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
# Initialize Spark and Glue context | |
sc = SparkContext() | |
glueContext = GlueContext(sc) | |
spark = glueContext.spark_session | |
# Retrieve and initialize job parameters | |
args = getResolvedOptions(sys.argv, ["JOB_NAME"]) | |
job = Job(glueContext) | |
job.init(args["JOB_NAME"], args) | |
# Define catalog, schema, and table name | |
database_name = "apache_iceberg_showcase" | |
table_name = "products" | |
# Define Change Data Capture settings | |
changelog_table_name = "products_changelog" | |
identifier_columns = "product_id" | |
start_snapshot_id = "8355263591683472575" # Value based on example | |
end_snapshot_id = "4192901519627873695" # Value based on example | |
spark.sql(f""" | |
CALL glue_catalog.system.create_changelog_view( | |
table => '{database_name}.{table_name}', | |
options => map( | |
'start-snapshot-id', '{start_snapshot_id}', | |
'end-snapshot-id', '{end_snapshot_id}' | |
), | |
changelog_view => '{changelog_table_name}', | |
compute_updates => true, | |
identifier_columns => array('{identifier_columns}') | |
) | |
""") | |
changelog_df = spark.sql(f"SELECT * FROM {changelog_table_name}") | |
changelog_df.write \ | |
.option("path", f"s3:///curated-zone/{changelog_table_name}") \ | |
.mode("append") \ | |
.saveAsTable(f"{database_name}.{changelog_table_name}") | |
job.commit() |
Yes I did that just now, but now I'm receiving the following any idea why that might be?
software.amazon.awssdk.core.exception.RetryableException: Data read has a different checksum than expected. Was 0x7e5292cafa14ea4e39e9ab27f6196005, but expected 0x00000000000000000000000000000000. This commonly means that the data was corrupted between the client and service.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You need to upload your own version of Apache Iceberg and use it. It's documented here under "Using a different Iceberg version".
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html