Created
April 23, 2025 06:50
-
-
Save drorata/c8caee13035c1c7089a03b21a1fd0371 to your computer and use it in GitHub Desktop.
How to refer to columns after join using PySpark - https://youtu.be/NPZOBOjOX04
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import findspark\n", | |
"from pyspark.sql import SparkSession\n", | |
"from pyspark.sql import functions as F\n", | |
"findspark.init()\n", | |
"\n", | |
"\n", | |
"spark = (\n", | |
" SparkSession.builder.appName(\"TestApp\")\n", | |
" .config(\"spark.driver.host\", \"localhost\")\n", | |
" .getOrCreate()\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql.functions import col\n", | |
"\n", | |
"M_FEET_FACTOR = 3.280839895\n", | |
"KG_POUND_FACTOR = 2.20462\n", | |
"\n", | |
"age_df = spark.createDataFrame(\n", | |
" [\n", | |
" (\"bob\", 11),\n", | |
" (\"alice\", 22),\n", | |
" (\"peter\", 33),\n", | |
" ],\n", | |
" [\"name\", \"value\"],\n", | |
")\n", | |
"\n", | |
"height_df = spark.createDataFrame(\n", | |
" [\n", | |
" (\"bob\", 1.10),\n", | |
" (\"alice\", 1.69),\n", | |
" (\"peter\", 1.88),\n", | |
" ],\n", | |
" [\"name\", \"value\"],\n", | |
").withColumn(\"imp_value\", F.round(F.col(\"value\") * M_FEET_FACTOR, 2))\n", | |
"\n", | |
"weight_df = spark.createDataFrame(\n", | |
" [\n", | |
" (\"bob\", 36),\n", | |
" (\"alice\", 55),\n", | |
" (\"peter\", 87),\n", | |
" ],\n", | |
" [\"name\", \"value\"],\n", | |
").withColumn(\"imp_value\", F.round(F.col(\"value\") * KG_POUND_FACTOR, 2))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df = age_df.join(height_df, on=\"name\", how=\"left\").join(\n", | |
" weight_df, on=\"name\", how=\"left\"\n", | |
")\n", | |
"joined_df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Using the original table" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df.select(age_df.value).show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df.select(weight_df.imp_value).show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Pre-process" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"pre_pro_age_df = age_df.withColumnRenamed(\"value\", \"age\")\n", | |
"pre_pro_height_df = height_df.withColumnRenamed(\"value\", \"height\").withColumnRenamed(\n", | |
" \"imp_value\", \"height (ft)\"\n", | |
")\n", | |
"pre_pro_weight_df = weight_df.withColumnRenamed(\"value\", \"weight\").withColumnRenamed(\n", | |
" \"imp_value\", \"weight (pound)\"\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df_2 = pre_pro_age_df.join(pre_pro_height_df, on=\"name\", how=\"left\").join(\n", | |
" pre_pro_weight_df, on=\"name\", how=\"left\"\n", | |
")\n", | |
"joined_df_2.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df_2.select(\"age\").show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df_2.select(F.col(\"height (ft)\")).show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Post process" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Same as the first case\n", | |
"joined_df_3 = (\n", | |
" age_df.join(height_df, on=\"name\", how=\"left\")\n", | |
" .join(weight_df, on=\"name\", how=\"left\")\n", | |
" .toDF(\n", | |
" *[\n", | |
" \"name\",\n", | |
" \"age\",\n", | |
" \"height\",\n", | |
" \"height_imp\",\n", | |
" \"weight\",\n", | |
" \"weight_imp\",\n", | |
" ]\n", | |
" )\n", | |
")\n", | |
"\n", | |
"joined_df_3.show()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": ".venv", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.12.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment