Created
November 3, 2024 10:45
-
-
Save dr-dror/b13c50d47a2d5922341aa6618c7cb123 to your computer and use it in GitHub Desktop.
Compute utilization using PySpark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql import SparkSession\n", | |
"from pyspark.sql.types import StructType, StructField, IntegerType, DateType\n", | |
"import pyspark.sql.functions as F\n", | |
"from datetime import date" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"spark = (\n", | |
" SparkSession.builder\n", | |
" # .config(\"spark.eventLog.gcMetrics.youngGenerationGarbageCollectors\", \"G1 Young Generation\")\n", | |
" # .config(\"spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\", \"G1 Old Generation\")\n", | |
" .getOrCreate()\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Setup\n", | |
"* Given an inventory of items\n", | |
"* That are being used for date ranges\n", | |
"* Task: what is the utilization rate?" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"schema = StructType([\n", | |
" StructField(\"id\", IntegerType(), False),\n", | |
" StructField(\"start_date\", DateType(), False),\n", | |
" StructField(\"end_date\", DateType(), False)\n", | |
"])\n", | |
"\n", | |
"df = spark.createDataFrame(\n", | |
" [\n", | |
" (1, date(2024, 1, 1), date(2024, 1, 3)),\n", | |
" (1, date(2024, 1, 8), date(2024, 1, 10)),\n", | |
" (1, date(2024, 1, 13), date(2024, 1, 17)),\n", | |
" (1, date(2024, 1, 29), date(2024, 1, 30)),\n", | |
" (2, date(2024, 1, 2), date(2024, 1, 4)),\n", | |
" (2, date(2024, 1, 10), date(2024, 1, 14)),\n", | |
" (3, date(2024, 1, 14), date(2024, 1, 19)),\n", | |
" (3, date(2024, 1, 27), date(2024, 1, 28)),\n", | |
" ],\n", | |
" schema\n", | |
")\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"min_date = df.select(F.min(\"start_date\")).collect()[0][0]\n", | |
"max_date = df.select(F.max(\"end_date\")).collect()[0][0]\n", | |
"min_date, max_date" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"\n", | |
"# Create DataFrame with sequence of dates\n", | |
"dates_df = spark.range(1).select(\n", | |
" F.explode(\n", | |
" F.sequence(\n", | |
" F.lit(min_date),\n", | |
" F.lit(max_date),\n", | |
" F.expr('interval 1 day')\n", | |
" )\n", | |
" ).alias(\"current_date\")\n", | |
")\n", | |
"dates_df.show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"joined_df = df.crossJoin(dates_df)\n", | |
"joined_df.show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"final_df = joined_df.withColumn(\n", | |
" \"used\",\n", | |
" F.when(\n", | |
" (F.col(\"start_date\") <= F.col(\"current_date\"))\n", | |
" & (F.col(\"end_date\") >= F.col(\"current_date\")),\n", | |
" 1,\n", | |
" ).otherwise(0),\n", | |
")\n", | |
"final_df.toPandas()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"df_pivoted = (\n", | |
" final_df.groupBy(\"current_date\")\n", | |
" .pivot(\"id\")\n", | |
" .agg(F.max(\"used\").alias(\"used\"))\n", | |
" .orderBy(F.col(\"current_date\"))\n", | |
")\n", | |
"df_pivoted.show()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"wrong_result_df = final_df.groupBy(\"current_date\").agg(\n", | |
" F.sum(\"used\").alias(\"total_used\"),\n", | |
" # Don't count! That's wrong!\n", | |
" F.count(\"used\").alias(\"count\")\n", | |
").orderBy(F.col(\"current_date\")).show(5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"result_df = (\n", | |
" final_df.groupBy(\"current_date\")\n", | |
" .agg(F.sum(\"used\").alias(\"total_used\"))\n", | |
" # What is the base you want? Daily? Weekly? Monthly?\n", | |
" .withColumn(\"total\", F.lit(df.select(F.col(\"id\")).distinct().count()))\n", | |
" .withColumn(\"percentage\", F.round(100 * F.col(\"total_used\") / F.col(\"total\"), 2))\n", | |
" .orderBy(F.col(\"current_date\"))\n", | |
" .show()\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"df.createOrReplaceTempView(\"df\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sql = \"\"\"\n", | |
"WITH date_bounds\n", | |
" AS (SELECT Min(start_date) AS min_date,\n", | |
" Max(end_date) AS max_date\n", | |
" FROM df),\n", | |
" dates\n", | |
" AS (SELECT col AS CURRENT_DATE\n", | |
" FROM (SELECT Explode(Sequence(min_date, max_date))\n", | |
" FROM date_bounds))\n", | |
"SELECT *\n", | |
"FROM df\n", | |
" CROSS JOIN dates\n", | |
"\"\"\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"spark.sql(sql).show()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": ".venv", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.12.2" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment