Last active
October 15, 2024 07:20
-
-
Save armgilles/62fdeb97dc8e5c9faac4958bf2f16a6e to your computer and use it in GitHub Desktop.
Benchmark about how to improve Polars migration to open source projet vcub_keeper.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import perf_counter | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import polars as pl | |
######################################## | |
# Functions to read and create DataFrames | |
######################################## | |
def read_df() -> pl.LazyFrame: | |
"""""" | |
column_dtypes = { | |
"gid": pl.UInt8, | |
"ident": pl.UInt8, | |
"type": pl.Categorical, | |
"name": pl.Utf8, | |
"state": pl.String, | |
"available_stands": pl.Int8, | |
"available_bikes": pl.Int8, | |
} | |
state_dict = {"CONNECTEE": 1, "DECONNECTEE": 0} | |
url = "https://raw.githubusercontent.com/armgilles/vcub_keeper/refs/heads/master/tests/data_for_tests/activite_data.csv" | |
activite = pl.scan_csv(url, schema_overrides=column_dtypes, try_parse_dates=True) | |
activite = activite.with_columns(pl.col("state").replace(state_dict)) | |
# Renaming columns | |
activite = activite.rename({"ident": "station_id", "ts": "date"}) | |
# Sorting DataFrame on station_id & date | |
activite = activite.sort(["station_id", "date"]) | |
return activite | |
def create_df_big(activite_data: pd.DataFrame) -> pd.DataFrame: | |
"""Increase the number of rows of the DataFrame""" | |
# Mapping new station_id to easily increase the number of rows | |
new_station_id_dict = {22: 1, 43: 100, 102: 200, 106: 300, 123: 400} | |
activite_data["station_id"] = activite_data["station_id"].map(new_station_id_dict) | |
n = 99 | |
activite_data_big = activite_data.copy() | |
# Concatenate the DataFrame n times | |
for i in range(n): | |
temp = activite_data.copy() | |
temp["station_id"] = temp["station_id"] + i | |
activite_data_big = pd.concat([activite_data_big, temp], ignore_index=True) | |
return activite_data_big | |
######################################## | |
# Calculation functions (pandas, polars eager | |
# and Expr) | |
######################################## | |
def fct_with_columns(data: pl.DataFrame) -> pl.DataFrame: | |
""" """ | |
data = data.with_columns(pl.col("available_stands").shift(1).over("station_id").alias("available_stands_shift")) | |
data = data.with_columns(pl.col("available_stands_shift").fill_null(pl.col("available_stands"))) | |
data = data.with_columns(transactions_out=(pl.col("available_stands") - pl.col("available_stands_shift"))) | |
data = data.with_columns( | |
transactions_out=pl.when(pl.col("transactions_out") < 0).then(0).otherwise(pl.col("transactions_out")) | |
) | |
# Drop non useful column | |
data = data.drop("available_stands_shift") | |
return data | |
def fct_expr() -> pl.Expr: | |
""" """ | |
available_stands_shift = ( | |
pl.col("available_stands").shift(1).over("station_id").fill_null(pl.col("available_stands")) | |
) | |
transactions_out = pl.col("available_stands") - available_stands_shift | |
return pl.when(transactions_out < 0).then(0).otherwise(transactions_out).alias("transactions_out") | |
######################################## | |
# To run benchmark easily | |
######################################## | |
def run_fct_with_columns_eager(df_eager): | |
df_eager.with_columns(fct_with_columns(df_eager)) | |
def run_fct_with_columns_lazy(df_lazy): | |
fct_with_columns(df_lazy).collect() | |
def run_fct_expr_eager(df_eager): | |
df_eager.with_columns(fct_expr()) | |
def run_fct_expr_lazy(df_lazy): | |
df_lazy.with_columns(fct_expr()).collect() | |
def measure_execution_time(df_lazy, df_eager): | |
times = { | |
"run_fct_with_columns_eager": [], | |
"run_fct_with_columns_lazy": [], | |
"run_fct_expr_eager": [], | |
"run_fct_expr_lazy": [], | |
} | |
for _ in range(5): | |
start_time = perf_counter() | |
run_fct_with_columns_eager(df_eager) | |
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_with_columns_lazy(df_lazy) | |
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_expr_eager(df_eager) | |
times["run_fct_expr_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_expr_lazy(df_lazy) | |
times["run_fct_expr_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
# Calculate the average execution times | |
avg_times = {key: sum(value) / len(value) for key, value in times.items()} | |
return avg_times | |
######################################## | |
# Process of benchmark | |
######################################## | |
# Read data and create a bigger df | |
df_lazy = read_df() # (5630, 8) | |
df_eager = df_lazy.collect() # (5630, 8) | |
df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8) | |
# List to store execution times | |
execution_times = [] | |
segment_length = 5630 | |
num_segments = df_big_lazy.collect().height // segment_length | |
# Apply functions on each segment and measure execution time | |
for i in range(1, num_segments + 1): | |
current_length = i * segment_length | |
df_lazy = df_big_lazy.slice(0, current_length) | |
df_eager = df_lazy.collect() | |
times = measure_execution_time(df_lazy, df_eager) | |
times["length"] = df_eager.shape[0] | |
execution_times.append(times) | |
# Convert execution times to DataFrame | |
df_times = pl.DataFrame(execution_times) | |
# Plot the results | |
plt.figure(figsize=(10, 6)) | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager") | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy") | |
plt.plot(df_times["length"], df_times["run_fct_expr_eager"], label="run_fct_expr_eager") | |
plt.plot(df_times["length"], df_times["run_fct_expr_lazy"], label="run_fct_expr_lazy") | |
plt.xlabel("Length of DataFrame") | |
plt.ylabel("Execution Time (milliseconds)") | |
plt.title("Execution Time vs Length of DataFrame") | |
plt.legend() | |
plt.grid(True) | |
plt.savefig("execution_time_vs_length_2.png") | |
plt.close() |
Author
armgilles
commented
Oct 15, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment