Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jirislav/4ba50753aa8278fccaf9bde0b08b2bce to your computer and use it in GitHub Desktop.
Save jirislav/4ba50753aa8278fccaf9bde0b08b2bce to your computer and use it in GitHub Desktop.
Introduction to Apache Spark - IT Academy 2022.ipynb
Display the source blob
Display the rendered blob
Raw
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Introduction to Apache Spark - IT Academy 2022.ipynb",
"provenance": [],
"collapsed_sections": [],
"toc_visible": true,
"include_colab_link": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/jirislav/4ba50753aa8278fccaf9bde0b08b2bce/introduction-to-apache-spark-it-academy-2022.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "markdown",
"source": [
"# What is Apache Spark & why do we need it\n",
"\n"
],
"metadata": {
"id": "DvIxUKJHffgV"
}
},
{
"cell_type": "markdown",
"source": [
"Imagine you were employed by a real estate agency to help them raise the income by predicting how long will it take from making a contract with the seller until a buyer is found based on futures like price, location, estate type, etc.\n",
"\n",
"To accomplish this goal, you could split this task into several parts:\n",
"- gather enough data from various sources\n",
"- transform to a common standardized format\n",
"- extract key attributes for the prediction\n",
"- apply machine learning algorithms"
],
"metadata": {
"id": "ZFetTCc9QCLv"
}
},
{
"cell_type": "markdown",
"source": [
"In this notebook, we are going to focus on the first part of the problem, which is:\n",
"- Extract\n",
"- Transform\n",
"- Load\n",
"\n",
"Or ETL, for short."
],
"metadata": {
"id": "ac2bwi4ISGly"
}
},
{
"cell_type": "markdown",
"source": [
"We picked Apache Spark for this task as it is a distributed data processing framework, designed to process large amounts of data in memory.\n",
"\n",
"\n",
"Run the following cell to install Spark 3.2.1 to your Colab instance (we'll use it later on):"
],
"metadata": {
"id": "PsVdfiJtP2lj"
}
},
{
"cell_type": "code",
"source": [
"# Install Spark\n",
"\n",
"import os\n",
"os.chdir(\"/content\")\n",
"!test -f spark-3.2.1-bin-hadoop2.7.tgz || wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz\n",
"!test -d spark-3.2.1-bin-hadoop2.7 || tar -xf spark-3.2.1-bin-hadoop2.7.tgz\n",
"\n",
"# Setup pyspark\n",
"!pip install findspark\n",
"import findspark\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-3.2.1-bin-hadoop2.7\"\n",
"findspark.init()\n",
"\n",
"# Create new SparkSession\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .getOrCreate()"
],
"metadata": {
"id": "EF9AGpOieuSk"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"# Just some handy functions to keep the code cells clean later on\n",
"from pyspark.sql.functions import col, floor, udf"
],
"metadata": {
"id": "p0hL82dE_YOG"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"# Extracting, Transforming & Loading"
],
"metadata": {
"id": "rIOcfobgflX8"
}
},
{
"cell_type": "markdown",
"source": [
"## How do we extract the data"
],
"metadata": {
"id": "SImHTGjvfphM"
}
},
{
"cell_type": "markdown",
"source": [
"### Reading the data"
],
"metadata": {
"id": "p9ZXJp8rxXN0"
}
},
{
"cell_type": "markdown",
"source": [
"There is a directory named `sample_data`, which contains housing market data from California from around 1990.\n",
"- to find out more about the data, see [it's docs](https://developers.google.com/machine-learning/crash-course/california-housing-data-description)\n",
"\n",
"We will load all the CSV that are named `california_housing*.csv` into Apache Spark session. But let's first take a prompt look into how does these data look:"
],
"metadata": {
"id": "1w63s9HoiMXC"
}
},
{
"cell_type": "code",
"source": [
"!head sample_data/california_housing*.csv"
],
"metadata": {
"id": "LvuR5BZ5g2sk"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Two files matched our pattern and both have the same \"schema\", so we can read both at once into a Spark session using simple command (which also saves it into a variable):"
],
"metadata": {
"id": "pCRGESJCiT7E"
}
},
{
"cell_type": "code",
"source": [
"housing_market = spark.read.csv(\"sample_data/california_housing*.csv\")"
],
"metadata": {
"id": "Jyewf3Bcoyhl"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Well, the command didn't fail, so we might presume it have probably succeeded.\n",
"- to make sure it really had succeded, let's see the content of our new variable:"
],
"metadata": {
"id": "HnphqHTupGP_"
}
},
{
"cell_type": "code",
"source": [
"housing_market"
],
"metadata": {
"id": "vRiYfAIqpY-i"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Now you can see that the variable `housing_market` holds something called `DataFrame`.\n",
"- but what is a `DataFrame` anyway?\n",
"\n",
"You can imagine `DataFrame` like a simple table with rows and named columns with neat API you can call upon it very easily (as we'll see later).\n",
"- to learn more about DataFrames, [see official Spark Docs](https://spark.apache.org/docs/3.2.0/sql-programming-guide.html)"
],
"metadata": {
"id": "uLSFEH-xp2CR"
}
},
{
"cell_type": "markdown",
"source": [
"### Your first task"
],
"metadata": {
"id": "-iCA1Lglxh7l"
}
},
{
"cell_type": "markdown",
"source": [
"Now we know the `housing_market` holds a DataFrame inside.\n",
"- but we can also see that the column names are missing\n",
" - *(from the previous console output you can see `_c0`, `_c1`, ... instead of the real column names)*\n",
"\n",
"To fix this, we have to consult the [spark.read.csv API](https://spark.apache.org/docs/3.2.0/sql-data-sources-csv.html#data-source-option) first.\n",
"- make some effort, and try to find the option we could use to make sure the column names of the CSV are properly extracted"
],
"metadata": {
"id": "3quH2Z0gp2NW"
}
},
{
"cell_type": "markdown",
"source": [
"Found it? Try to modify the options within the function call and re-run the cell.\n",
"- *HINT: Wait a while after writing the `,` as Colab'll show you Spark API suggestions*"
],
"metadata": {
"id": "s44v2neFtVwX"
}
},
{
"cell_type": "code",
"source": [
"# Change this call to create DF with respect of the column names from the CSV headers.\n",
"housing_market = spark.read.csv(\"sample_data/california_housing_test.csv\")\n",
"housing_market"
],
"metadata": {
"id": "z5A3KNgXsl-A"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"If you see real column names (`longitude`, `latitude`, ...) instead of (`_c0`, `_c1`, ...), you got it right.\n",
"\n",
"Another way to show the DataFrame schema is simply calling:"
],
"metadata": {
"id": "bTAkcuT7ugFt"
}
},
{
"cell_type": "code",
"source": [
"housing_market.printSchema()"
],
"metadata": {
"id": "IeYC88gzufj4"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Last thing you might find useful is to print a few lines of the underlying DataFrame:"
],
"metadata": {
"id": "pbyhgOIbyMSp"
}
},
{
"cell_type": "code",
"source": [
"housing_market.show()"
],
"metadata": {
"id": "ZKJMhQnqyTlN"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"### Extraction complete"
],
"metadata": {
"id": "vmY5Irs1xnBu"
}
},
{
"cell_type": "markdown",
"source": [
"That's about all you need to know for the moment of how to extract the data.\n",
"\n",
"Keep in mind that in the real world, it's actually way more difficult, since you have to tackle problems including, but not limited to:\n",
"- schema of one data source doesn't match the schema of another data source\n",
"- file formats are different between data sources (e.g. Parquet instead of CSV)\n",
"- you might have to crawl the data first from some API using pagination\n",
"- ..."
],
"metadata": {
"id": "UqRBXmU2vaEM"
}
},
{
"cell_type": "markdown",
"source": [
"## How do we transform the data"
],
"metadata": {
"id": "zLxb8JpYf5nv"
}
},
{
"cell_type": "markdown",
"source": [
"### What a transformation and why should I care?"
],
"metadata": {
"id": "w45CMKMj8Cv9"
}
},
{
"cell_type": "markdown",
"source": [
"In the previous chapter, we have extracted housing market data from CSV files into a single DataFrame under a variable called `housing_market`. In this chapter, we will focus on transforming the data into a different form. But what is the motivation of such transformation, anyway?\n",
"\n",
"Well, as data grow big, it's harder to extract a particular value, since there is so much data, the computation would essentially take forever.\n",
"\n",
"Because of this fact, it's common to process finite chunks of recent data and transform them into another form, so that we can:\n",
"- extract the desired value faster when needed\n",
"- prepare values for machine learning algorithms"
],
"metadata": {
"id": "qD12m2-e6lCI"
}
},
{
"cell_type": "markdown",
"source": [
"### Transformation example"
],
"metadata": {
"id": "uo-Y9uAI8QzY"
}
},
{
"cell_type": "markdown",
"source": [
"We will elaborate upon two transformation types:\n",
"- *group by* (aggregation)\n",
"- *user defined function* (or UDF for short)"
],
"metadata": {
"id": "sJcVrFESBraG"
}
},
{
"cell_type": "markdown",
"source": [
"#### Group By (aggregation)"
],
"metadata": {
"id": "OYXFBfW3CYgi"
}
},
{
"cell_type": "markdown",
"source": [
"For simplicity, let us build an aggregation, which tells us, how many housing blocks (one housing block should be one row in the DataFrame) were built how many decades ago.\n",
"\n",
"To achieve this, we need to complete these high-level steps:\n",
"- define which decade is what block\n",
"- group all blocks together by the defined decade\n",
"- count the groups that were formed for each decade"
],
"metadata": {
"id": "iRfSd_1_8S-i"
}
},
{
"cell_type": "markdown",
"source": [
"To define a new column, we can use the [`DF.withColumn` API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html). To get a housing block median age in decades, we simply floor the `housing_median_age` to the closest ten and store it in a new column named `age_group`."
],
"metadata": {
"id": "0W1-q3KZ-YFe"
}
},
{
"cell_type": "code",
"source": [
"housing_market_with_age_group = housing_market.withColumn(\"age_group\", floor(col(\"housing_median_age\") / 10) * 10)\n",
"\n",
"housing_market_with_age_group"
],
"metadata": {
"id": "Ish6U50L-wka"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Now we can group the DataFrame using our fresh column and perform count of each group:"
],
"metadata": {
"id": "Nz2eb2LQ_3Xl"
}
},
{
"cell_type": "code",
"source": [
"age_group_histogram = housing_market_with_age_group.groupBy(\"age_group\").count()\n",
"\n",
"age_group_histogram"
],
"metadata": {
"id": "upPwhIJe_-Ff"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And that's pretty much it! We might also want to enjoy looking at the resulting data.\n",
"- to do that, we should also **sort the DataFrame** so that even human can easily deduct conclusion:"
],
"metadata": {
"id": "eB9iObYTAcb1"
}
},
{
"cell_type": "code",
"source": [
"age_group_histogram = age_group_histogram.sort(\"age_group\")\n",
"\n",
"age_group_histogram.show()"
],
"metadata": {
"id": "Fp9FyugnA1Mb"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"We did this step-by-step to keep it simple, but it is actually more common to write the whole transformation into a single command like this:"
],
"metadata": {
"id": "QhfH_rwBBRAh"
}
},
{
"cell_type": "code",
"source": [
"age_group_histogram = housing_market \\\n",
" .withColumn(\"age_group\", floor(col(\"housing_median_age\") / 10) * 10) \\\n",
" .groupBy(col(\"age_group\")) \\\n",
" .count() \\\n",
" .sort(\"age_group\")\n",
"age_group_histogram.show()"
],
"metadata": {
"id": "XhxQAenN0Lq5"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"We can also easily convert our Spark DataFrame into a Pandas DataFrame for visualisation in Google Colab:"
],
"metadata": {
"id": "rEIuQK8UBDJ1"
}
},
{
"cell_type": "code",
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"age_group_histogram.toPandas().plot(x=\"age_group\")"
],
"metadata": {
"id": "Av2nCh0q4i1q"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"You can see, that most housing blocks were built (using median age of all buildings in a block) somewhat between 30-40 years ago (because `age_group=30` is the floor of the interval). Maybe this metric could help someone from the real estate agency, but please note that we used only a simple `count`, yet there are many more actions you can perform upon [the `GroupedData` API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.html)."
],
"metadata": {
"id": "pg4PZOeKCp_w"
}
},
{
"cell_type": "markdown",
"source": [
"#### User Defined Function (UDF)"
],
"metadata": {
"id": "YWueSJhvDwXN"
}
},
{
"cell_type": "markdown",
"source": [
"Sometimes the transformation we need to do, cannot be easily describe using the available API and we need to write our custom function instead.\n",
"\n",
"Since all our columns are numbers, we should not need to write our own UDFs, but let's try to write it anyway. For example, we could write a function, that categorizes each block into one of:\n",
"- poor\n",
" - `medianIncome < 1` and `population / totalRooms < 1`\n",
"- middle_class\n",
" - `medianIncome between 1-3` and `population / totalRooms < 2`\n",
"- rich\n",
" - `medianIncome > 3` and `population / totalRooms > 1`\n",
"- unknown\n",
" - all other housing blocks\n",
"\n",
"\n"
],
"metadata": {
"id": "z7CgKR26FWx_"
}
},
{
"cell_type": "markdown",
"source": [
"So we start by defining our function within pure Python:"
],
"metadata": {
"id": "AoYM9FyNIYQn"
}
},
{
"cell_type": "code",
"source": [
"def categorize(median_income, population, total_rooms):\n",
" # Since we focus on the basics, let's not dive into why we need to convert string to float\n",
" median_income = float(median_income)\n",
" population_over_rooms = float(population) / float(total_rooms)\n",
"\n",
" if median_income < 1 and population_over_rooms < 1:\n",
" return \"poor\"\n",
" elif median_income > 1 and median_income < 3 and population_over_rooms < 2:\n",
" return \"middle_class\"\n",
" elif median_income > 3 and population_over_rooms > 1:\n",
" return \"rich\"\n",
" else:\n",
" return \"unknown\""
],
"metadata": {
"id": "30GQSa9uIeB8"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Now we can define our pure Python function as Spark UDF:"
],
"metadata": {
"id": "aue342ibIYV6"
}
},
{
"cell_type": "code",
"source": [
"categorize_udf = udf(categorize)"
],
"metadata": {
"id": "NHguVhOOFCRt"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"Great, let's use it to categorize our loaded housing blocks and count the categories using `group by` we have already covered:"
],
"metadata": {
"id": "u5FfeQSvJ2aX"
}
},
{
"cell_type": "code",
"source": [
"housing_market \\\n",
" .groupBy(categorize_udf(col(\"median_income\"), col(\"population\"), col(\"total_rooms\"))) \\\n",
" .count() \\\n",
" .sort(\"count\") \\\n",
" .show()"
],
"metadata": {
"id": "JAjw3TM1KCBh"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"And this is basically how we use UDF to extract algorithmically defined value.\n",
"\n",
"Please note that we should always use Spark SQL API instead, where possible, because Spark can optimize the job and run efficiently. Instead, with UDF, the Python code is a black-box for Spark, so it cannot be optimized."
],
"metadata": {
"id": "rjzoDTEFMLWd"
}
},
{
"cell_type": "markdown",
"source": [
"## Loading the data"
],
"metadata": {
"id": "jTrlBFcvf-ck"
}
},
{
"cell_type": "markdown",
"source": [
"*NOTE: It might be confusing, to talk about \"loading\" when we actually mean operation that saves data somewhere else. But that's definition of ETL, so we should respect this globally accepted naming convention.*"
],
"metadata": {
"id": "rm-ogvVkM0VE"
}
},
{
"cell_type": "markdown",
"source": [
"This last chapter will very briefly show-case, how to store the in-memory created DataFrame for later usage.\n",
"\n",
"We will store it into a disposable Google Colab kernel instance, but normally, it should be stored into a data warehouse (Dremio, Superset, Hive, HBase, Impala, Kafka, HDFS, S3, ...).\n",
"\n",
"The benefits of using a data warehouse include:\n",
"- to use the results later on using additional data pipelines or analytical tools\n",
"- to ensure data loss protection, durability and reliability\n",
"- to easily maintain data statistics such as their size, available history, etc.\n",
"\n",
"Each data warehouse has it's own documentation, from where you can find instructions of how to store your DataFrame from Spark."
],
"metadata": {
"id": "xhLTXqZYNT7w"
}
},
{
"cell_type": "markdown",
"source": [
"For most of the warehouses, it's as simple, as calling `DataFrame.write.FORMAT(PATH)`. However in our case, we are good with writing only CSV file, so we use:"
],
"metadata": {
"id": "lbgmEKDVP_sj"
}
},
{
"cell_type": "code",
"source": [
"age_group_histogram.write.csv(\"my_first_spark_output\", header=True, mode=\"overwrite\")"
],
"metadata": {
"id": "NfEuHMHeQJ61"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"With this command, `my_first_spark_output` directory should be created:"
],
"metadata": {
"id": "DWARx4zLRCCA"
}
},
{
"cell_type": "code",
"source": [
"!ls -al my_first_spark_output"
],
"metadata": {
"id": "DjIMrTKOQgL7"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"source": [
"!cat my_first_spark_output/*.csv"
],
"metadata": {
"id": "q2ls62A5RAU9"
},
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"source": [
"There you have it. Your own CSV aggregation result right from your browser."
],
"metadata": {
"id": "ZLmdBJfPRxe7"
}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment