Skip to content

Instantly share code, notes, and snippets.

Last active October 15, 2024 07:20
Show Gist options
  • Save armgilles/62fdeb97dc8e5c9faac4958bf2f16a6e to your computer and use it in GitHub Desktop.
Save armgilles/62fdeb97dc8e5c9faac4958bf2f16a6e to your computer and use it in GitHub Desktop.
Benchmark about how to improve Polars migration to open source projet vcub_keeper.
from time import perf_counter
import matplotlib.pyplot as plt
import pandas as pd
import polars as pl
# Functions to read and create DataFrames
def read_df() -> pl.LazyFrame:
column_dtypes = {
"gid": pl.UInt8,
"ident": pl.UInt8,
"type": pl.Categorical,
"name": pl.Utf8,
"state": pl.String,
"available_stands": pl.Int8,
"available_bikes": pl.Int8,
state_dict = {"CONNECTEE": 1, "DECONNECTEE": 0}
url = ""
activite = pl.scan_csv(url, schema_overrides=column_dtypes, try_parse_dates=True)
activite = activite.with_columns(pl.col("state").replace(state_dict))
# Renaming columns
activite = activite.rename({"ident": "station_id", "ts": "date"})
# Sorting DataFrame on station_id & date
activite = activite.sort(["station_id", "date"])
return activite
def create_df_big(activite_data: pd.DataFrame) -> pd.DataFrame:
"""Increase the number of rows of the DataFrame"""
# Mapping new station_id to easily increase the number of rows
new_station_id_dict = {22: 1, 43: 100, 102: 200, 106: 300, 123: 400}
activite_data["station_id"] = activite_data["station_id"].map(new_station_id_dict)
n = 99
activite_data_big = activite_data.copy()
# Concatenate the DataFrame n times
for i in range(n):
temp = activite_data.copy()
temp["station_id"] = temp["station_id"] + i
activite_data_big = pd.concat([activite_data_big, temp], ignore_index=True)
return activite_data_big
# Calculation functions (pandas, polars eager
# and Expr)
def fct_with_columns(data: pl.DataFrame) -> pl.DataFrame:
""" """
data = data.with_columns(pl.col("available_stands").shift(1).over("station_id").alias("available_stands_shift"))
data = data.with_columns(pl.col("available_stands_shift").fill_null(pl.col("available_stands")))
data = data.with_columns(transactions_out=(pl.col("available_stands") - pl.col("available_stands_shift")))
data = data.with_columns(
transactions_out=pl.when(pl.col("transactions_out") < 0).then(0).otherwise(pl.col("transactions_out"))
# Drop non useful column
data = data.drop("available_stands_shift")
return data
def fct_expr() -> pl.Expr:
""" """
available_stands_shift = (
transactions_out = pl.col("available_stands") - available_stands_shift
return pl.when(transactions_out < 0).then(0).otherwise(transactions_out).alias("transactions_out")
# To run benchmark easily
def run_fct_with_columns_eager(df_eager):
def run_fct_with_columns_lazy(df_lazy):
def run_fct_expr_eager(df_eager):
def run_fct_expr_lazy(df_lazy):
def measure_execution_time(df_lazy, df_eager):
times = {
"run_fct_with_columns_eager": [],
"run_fct_with_columns_lazy": [],
"run_fct_expr_eager": [],
"run_fct_expr_lazy": [],
for _ in range(5):
start_time = perf_counter()
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
times["run_fct_expr_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
times["run_fct_expr_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
# Calculate the average execution times
avg_times = {key: sum(value) / len(value) for key, value in times.items()}
return avg_times
# Process of benchmark
# Read data and create a bigger df
df_lazy = read_df() # (5630, 8)
df_eager = df_lazy.collect() # (5630, 8)
df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8)
# List to store execution times
execution_times = []
segment_length = 5630
num_segments = df_big_lazy.collect().height // segment_length
# Apply functions on each segment and measure execution time
for i in range(1, num_segments + 1):
current_length = i * segment_length
df_lazy = df_big_lazy.slice(0, current_length)
df_eager = df_lazy.collect()
times = measure_execution_time(df_lazy, df_eager)
times["length"] = df_eager.shape[0]
# Convert execution times to DataFrame
df_times = pl.DataFrame(execution_times)
# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager")
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy")
plt.plot(df_times["length"], df_times["run_fct_expr_eager"], label="run_fct_expr_eager")
plt.plot(df_times["length"], df_times["run_fct_expr_lazy"], label="run_fct_expr_lazy")
plt.xlabel("Length of DataFrame")
plt.ylabel("Execution Time (milliseconds)")
plt.title("Execution Time vs Length of DataFrame")
Copy link


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment