Skip to content

Instantly share code, notes, and snippets.

@dr-dror
Created November 3, 2024 10:45
Show Gist options
  • Save dr-dror/b13c50d47a2d5922341aa6618c7cb123 to your computer and use it in GitHub Desktop.
Save dr-dror/b13c50d47a2d5922341aa6618c7cb123 to your computer and use it in GitHub Desktop.
Compute utilization using PySpark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import StructType, StructField, IntegerType, DateType\n",
"import pyspark.sql.functions as F\n",
"from datetime import date"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"spark = (\n",
" SparkSession.builder\n",
" # .config(\"spark.eventLog.gcMetrics.youngGenerationGarbageCollectors\", \"G1 Young Generation\")\n",
" # .config(\"spark.eventLog.gcMetrics.oldGenerationGarbageCollectors\", \"G1 Old Generation\")\n",
" .getOrCreate()\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup\n",
"* Given an inventory of items\n",
"* That are being used for date ranges\n",
"* Task: what is the utilization rate?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"schema = StructType([\n",
" StructField(\"id\", IntegerType(), False),\n",
" StructField(\"start_date\", DateType(), False),\n",
" StructField(\"end_date\", DateType(), False)\n",
"])\n",
"\n",
"df = spark.createDataFrame(\n",
" [\n",
" (1, date(2024, 1, 1), date(2024, 1, 3)),\n",
" (1, date(2024, 1, 8), date(2024, 1, 10)),\n",
" (1, date(2024, 1, 13), date(2024, 1, 17)),\n",
" (1, date(2024, 1, 29), date(2024, 1, 30)),\n",
" (2, date(2024, 1, 2), date(2024, 1, 4)),\n",
" (2, date(2024, 1, 10), date(2024, 1, 14)),\n",
" (3, date(2024, 1, 14), date(2024, 1, 19)),\n",
" (3, date(2024, 1, 27), date(2024, 1, 28)),\n",
" ],\n",
" schema\n",
")\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"min_date = df.select(F.min(\"start_date\")).collect()[0][0]\n",
"max_date = df.select(F.max(\"end_date\")).collect()[0][0]\n",
"min_date, max_date"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"# Create DataFrame with sequence of dates\n",
"dates_df = spark.range(1).select(\n",
" F.explode(\n",
" F.sequence(\n",
" F.lit(min_date),\n",
" F.lit(max_date),\n",
" F.expr('interval 1 day')\n",
" )\n",
" ).alias(\"current_date\")\n",
")\n",
"dates_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"joined_df = df.crossJoin(dates_df)\n",
"joined_df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"final_df = joined_df.withColumn(\n",
" \"used\",\n",
" F.when(\n",
" (F.col(\"start_date\") <= F.col(\"current_date\"))\n",
" & (F.col(\"end_date\") >= F.col(\"current_date\")),\n",
" 1,\n",
" ).otherwise(0),\n",
")\n",
"final_df.toPandas()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_pivoted = (\n",
" final_df.groupBy(\"current_date\")\n",
" .pivot(\"id\")\n",
" .agg(F.max(\"used\").alias(\"used\"))\n",
" .orderBy(F.col(\"current_date\"))\n",
")\n",
"df_pivoted.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wrong_result_df = final_df.groupBy(\"current_date\").agg(\n",
" F.sum(\"used\").alias(\"total_used\"),\n",
" # Don't count! That's wrong!\n",
" F.count(\"used\").alias(\"count\")\n",
").orderBy(F.col(\"current_date\")).show(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"result_df = (\n",
" final_df.groupBy(\"current_date\")\n",
" .agg(F.sum(\"used\").alias(\"total_used\"))\n",
" # What is the base you want? Daily? Weekly? Monthly?\n",
" .withColumn(\"total\", F.lit(df.select(F.col(\"id\")).distinct().count()))\n",
" .withColumn(\"percentage\", F.round(100 * F.col(\"total_used\") / F.col(\"total\"), 2))\n",
" .orderBy(F.col(\"current_date\"))\n",
" .show()\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.createOrReplaceTempView(\"df\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sql = \"\"\"\n",
"WITH date_bounds\n",
" AS (SELECT Min(start_date) AS min_date,\n",
" Max(end_date) AS max_date\n",
" FROM df),\n",
" dates\n",
" AS (SELECT col AS CURRENT_DATE\n",
" FROM (SELECT Explode(Sequence(min_date, max_date))\n",
" FROM date_bounds))\n",
"SELECT *\n",
"FROM df\n",
" CROSS JOIN dates\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"spark.sql(sql).show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment