Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active April 27, 2023 16:18
Show Gist options
  • Save rmoff/677ba32ff194ca976dfd4d492c2c0b43 to your computer and use it in GitHub Desktop.
Save rmoff/677ba32ff194ca976dfd4d492c2c0b43 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "1041ae6f",
"metadata": {},
"source": [
"![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
]
},
{
"cell_type": "markdown",
"id": "08d9d173",
"metadata": {},
"source": [
"This notebook runs using the Docker Compose at https://github.com/tabular-io/docker-spark-iceberg"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "fd61c16f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/04/25 09:41:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.\n"
]
},
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - in-memory</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://d704c85c6890:4041\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.3.2</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>PySparkShell</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0xffff651ac100>"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Jupyter\").getOrCreate()\n",
"\n",
"spark"
]
},
{
"cell_type": "markdown",
"id": "747bee98",
"metadata": {},
"source": [
"To be able to rerun the notebook several times, let's drop the `permits` table if it exists to start fresh."
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "26245f7e",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"++\n",
"||\n",
"++\n",
"++"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"CREATE DATABASE IF NOT EXISTS nyc"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "08a13fcc",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"++\n",
"||\n",
"++\n",
"++"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"DROP TABLE IF EXISTS nyc.permits"
]
},
{
"cell_type": "markdown",
"id": "eead44c0",
"metadata": {},
"source": [
"# Load NYC Film Permits Data"
]
},
{
"cell_type": "markdown",
"id": "6f9a9f41",
"metadata": {},
"source": [
"For this demo, we will use the [New York City Film Permits dataset](https://data.cityofnewyork.us/City-Government/Film-Permits/tg4x-b46p) available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!\n",
"\n",
"We'll save the sample dataset into an iceberg table called `permits`."
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "e3cc669a",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"SLF4J: Failed to load class \"org.slf4j.impl.StaticLoggerBinder\".\n",
"SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
"SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.\n",
" \r"
]
}
],
"source": [
"df = spark.read.option(\"inferSchema\",\"true\").option(\"multiline\",\"true\").json(\"/home/iceberg/data/nyc_film_permits.json\")\n",
"df.write.saveAsTable(\"nyc.permits\")"
]
},
{
"cell_type": "markdown",
"id": "378cf187",
"metadata": {},
"source": [
"Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "f3170161",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" <th>borough</th>\n",
" <th>permit_cnt</th>\n",
" </tr>\n",
" <tr>\n",
" <td>Brooklyn</td>\n",
" <td>348</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Manhattan</td>\n",
" <td>417</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Bronx</td>\n",
" <td>36</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Queens</td>\n",
" <td>188</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Staten Island</td>\n",
" <td>11</td>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"+---------------+------------+\n",
"| borough | permit_cnt |\n",
"+---------------+------------+\n",
"| Brooklyn | 348 |\n",
"| Manhattan | 417 |\n",
"| Bronx | 36 |\n",
"| Queens | 188 |\n",
"| Staten Island | 11 |\n",
"+---------------+------------+"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"SELECT borough, count(*) permit_cnt\n",
"FROM nyc.permits\n",
"GROUP BY borough"
]
},
{
"cell_type": "markdown",
"id": "fa31a9ea",
"metadata": {},
"source": [
"# The Setup"
]
},
{
"cell_type": "markdown",
"id": "d845953b",
"metadata": {},
"source": [
"Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the `write.wap.enabled` table metadata property to `true`"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "bf29df0b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/04/25 09:41:04 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up\n"
]
},
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"++\n",
"||\n",
"++\n",
"++"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"ALTER TABLE nyc.permits\n",
"SET TBLPROPERTIES (\n",
" 'write.wap.enabled'='true'\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "14035a18",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/04/25 09:41:04 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up\n"
]
},
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"++\n",
"||\n",
"++\n",
"++"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"ALTER TABLE nyc.permits\n",
"CREATE BRANCH etl_job_42"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "65bc4280",
"metadata": {},
"outputs": [],
"source": [
"spark.conf.set('spark.wap.branch', 'etl_job_42')"
]
},
{
"cell_type": "markdown",
"id": "437088f6",
"metadata": {},
"source": [
"# Staging The Changes"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "14843243",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"++\n",
"||\n",
"++\n",
"++"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"DELETE FROM nyc.permits\n",
"WHERE borough='Manhattan'"
]
},
{
"cell_type": "markdown",
"id": "bf332168",
"metadata": {},
"source": [
"This should show the dataset without `Manhattan` rows:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "1cd4b72b",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" <th>borough</th>\n",
" <th>permit_cnt</th>\n",
" </tr>\n",
" <tr>\n",
" <td>Brooklyn</td>\n",
" <td>348</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Bronx</td>\n",
" <td>36</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Queens</td>\n",
" <td>188</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Staten Island</td>\n",
" <td>11</td>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"+---------------+------------+\n",
"| borough | permit_cnt |\n",
"+---------------+------------+\n",
"| Brooklyn | 348 |\n",
"| Bronx | 36 |\n",
"| Queens | 188 |\n",
"| Staten Island | 11 |\n",
"+---------------+------------+"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"SELECT borough, count(*) permit_cnt\n",
"FROM nyc.permits VERSION AS OF 'etl_job_42'\n",
"GROUP BY borough"
]
},
{
"cell_type": "markdown",
"id": "5ad40cf1",
"metadata": {},
"source": [
"If WAP has worked then the following query should show the full dataset, including `Manhattan`:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "95df15e9",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" <th>borough</th>\n",
" <th>permit_cnt</th>\n",
" </tr>\n",
" <tr>\n",
" <td>Brooklyn</td>\n",
" <td>348</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Bronx</td>\n",
" <td>36</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Queens</td>\n",
" <td>188</td>\n",
" </tr>\n",
" <tr>\n",
" <td>Staten Island</td>\n",
" <td>11</td>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"+---------------+------------+\n",
"| borough | permit_cnt |\n",
"+---------------+------------+\n",
"| Brooklyn | 348 |\n",
"| Bronx | 36 |\n",
"| Queens | 188 |\n",
"| Staten Island | 11 |\n",
"+---------------+------------+"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"SELECT borough, count(*) permit_cnt\n",
"FROM nyc.permits\n",
"GROUP BY borough"
]
},
{
"cell_type": "markdown",
"id": "041d38ff",
"metadata": {},
"source": [
"## ❗❗ Has WAP not worked? \n",
"\n",
"The above table shows that `Manhattan` has been deleted from the published table view too"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "9823b243",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table>\n",
" <tr>\n",
" <th>committed_at</th>\n",
" <th>snapshot_id</th>\n",
" <th>parent_id</th>\n",
" <th>operation</th>\n",
" <th>manifest_list</th>\n",
" <th>summary</th>\n",
" </tr>\n",
" <tr>\n",
" <td>2023-04-25 09:41:03.462000</td>\n",
" <td>7930668745942081407</td>\n",
" <td>None</td>\n",
" <td>append</td>\n",
" <td>s3://warehouse/nyc/permits/metadata/snap-7930668745942081407-1-ec7e0c3e-3f2d-4e7e-b434-b55997bf03df.avro</td>\n",
" <td>{&#x27;spark.app.id&#x27;: &#x27;local-1682415657807&#x27;, &#x27;changed-partition-count&#x27;: &#x27;1&#x27;, &#x27;added-data-files&#x27;: &#x27;1&#x27;, &#x27;total-equality-deletes&#x27;: &#x27;0&#x27;, &#x27;added-records&#x27;: &#x27;1000&#x27;, &#x27;total-position-deletes&#x27;: &#x27;0&#x27;, &#x27;added-files-size&#x27;: &#x27;53565&#x27;, &#x27;total-delete-files&#x27;: &#x27;0&#x27;, &#x27;total-files-size&#x27;: &#x27;53565&#x27;, &#x27;total-records&#x27;: &#x27;1000&#x27;, &#x27;total-data-files&#x27;: &#x27;1&#x27;}</td>\n",
" </tr>\n",
" <tr>\n",
" <td>2023-04-25 09:41:06.412000</td>\n",
" <td>7158850652265602291</td>\n",
" <td>7930668745942081407</td>\n",
" <td>overwrite</td>\n",
" <td>s3://warehouse/nyc/permits/metadata/snap-7158850652265602291-1-5396e794-7ad2-4268-9606-ccb0ab88bf4a.avro</td>\n",
" <td>{&#x27;added-data-files&#x27;: &#x27;1&#x27;, &#x27;total-equality-deletes&#x27;: &#x27;0&#x27;, &#x27;added-records&#x27;: &#x27;583&#x27;, &#x27;deleted-data-files&#x27;: &#x27;1&#x27;, &#x27;deleted-records&#x27;: &#x27;1000&#x27;, &#x27;total-records&#x27;: &#x27;583&#x27;, &#x27;spark.app.id&#x27;: &#x27;local-1682415657807&#x27;, &#x27;removed-files-size&#x27;: &#x27;53565&#x27;, &#x27;changed-partition-count&#x27;: &#x27;1&#x27;, &#x27;total-position-deletes&#x27;: &#x27;0&#x27;, &#x27;added-files-size&#x27;: &#x27;31817&#x27;, &#x27;total-delete-files&#x27;: &#x27;0&#x27;, &#x27;total-files-size&#x27;: &#x27;31817&#x27;, &#x27;total-data-files&#x27;: &#x27;1&#x27;}</td>\n",
" </tr>\n",
"</table>"
],
"text/plain": [
"+----------------------------+---------------------+---------------------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
"| committed_at | snapshot_id | parent_id | operation | manifest_list | summary |\n",
"+----------------------------+---------------------+---------------------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
"| 2023-04-25 09:41:03.462000 | 7930668745942081407 | None | append | s3://warehouse/nyc/permits/metadata/snap-7930668745942081407-1-ec7e0c3e-3f2d-4e7e-b434-b55997bf03df.avro | {'spark.app.id': 'local-1682415657807', 'changed-partition-count': '1', 'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '1000', 'total-position-deletes': '0', 'added-files-size': '53565', 'total-delete-files': '0', 'total-files-size': '53565', 'total-records': '1000', 'total-data-files': '1'} |\n",
"| 2023-04-25 09:41:06.412000 | 7158850652265602291 | 7930668745942081407 | overwrite | s3://warehouse/nyc/permits/metadata/snap-7158850652265602291-1-5396e794-7ad2-4268-9606-ccb0ab88bf4a.avro | {'added-data-files': '1', 'total-equality-deletes': '0', 'added-records': '583', 'deleted-data-files': '1', 'deleted-records': '1000', 'total-records': '583', 'spark.app.id': 'local-1682415657807', 'removed-files-size': '53565', 'changed-partition-count': '1', 'total-position-deletes': '0', 'added-files-size': '31817', 'total-delete-files': '0', 'total-files-size': '31817', 'total-data-files': '1'} |\n",
"+----------------------------+---------------------+---------------------+-----------+----------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%sql\n",
"\n",
"SELECT *\n",
"FROM nyc.permits.snapshots"
]
},
{
"cell_type": "markdown",
"id": "8f34f306",
"metadata": {},
"source": [
"_Note: there's no `wap.id` or `wap.branch` in the `summary` - should there be if things had worked?_ "
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@chrigehr
Copy link

Aren't you missing a spark.conf.unset("spark.wap.branch") before step 11 to "leave" the branch?

@rmoff
Copy link
Author

rmoff commented Apr 27, 2023

@chrigehr yes you're right, that was the issue. I should remove the gist - it's superseded by databricks/docker-spark-iceberg#76.

thanks.

@chrigehr
Copy link

no problem @rmoff . I found the gist by accident when I was looking for information on what works (and what doesn't) with Spark 3.1 regarding branching and tagging, and what the status is regarding Spark 3.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment