Last active
October 15, 2024 07:22
-
-
Save armgilles/908bc6d6fcb5e514521e62b0216131be to your computer and use it in GitHub Desktop.
Utilisation d'expr dans with_coloumns pour optimiser mais résultats moins bon.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import perf_counter | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import polars as pl | |
######################################## | |
# Functions to read and create DataFrames | |
######################################## | |
def read_df() -> pl.LazyFrame: | |
"""""" | |
column_dtypes = { | |
"gid": pl.UInt8, | |
"ident": pl.UInt8, | |
"type": pl.Categorical, | |
"name": pl.Utf8, | |
"state": pl.String, | |
"available_stands": pl.Int8, | |
"available_bikes": pl.Int8, | |
} | |
state_dict = {"CONNECTEE": 1, "DECONNECTEE": 0} | |
url = "https://raw.githubusercontent.com/armgilles/vcub_keeper/refs/heads/master/tests/data_for_tests/activite_data.csv" | |
activite = pl.scan_csv(url, schema_overrides=column_dtypes, try_parse_dates=True) | |
activite = activite.with_columns(pl.col("state").replace(state_dict)) | |
# Renaming columns | |
activite = activite.rename({"ident": "station_id", "ts": "date"}) | |
# Sorting DataFrame on station_id & date | |
activite = activite.sort(["station_id", "date"]) | |
return activite | |
def create_df_big(activite_data: pd.DataFrame) -> pd.DataFrame: | |
"""Increase the number of rows of the DataFrame""" | |
# Mapping new station_id to easily increase the number of rows | |
new_station_id_dict = {22: 1, 43: 100, 102: 200, 106: 300, 123: 400} | |
activite_data["station_id"] = activite_data["station_id"].map(new_station_id_dict) | |
n = 99 | |
activite_data_big = activite_data.copy() | |
# Concatenate the DataFrame n times | |
for i in range(n): | |
temp = activite_data.copy() | |
temp["station_id"] = temp["station_id"] + i | |
activite_data_big = pd.concat([activite_data_big, temp], ignore_index=True) | |
return activite_data_big | |
######################################## | |
# Calculation functions (pandas, polars eager | |
# and Expr) | |
######################################## | |
def fct_with_columns_opti(data: pl.DataFrame) -> pl.DataFrame: | |
""" """ | |
available_stands_shift = ( | |
pl.col("available_stands").shift(1).over("station_id").fill_null(pl.col("available_stands")) | |
) | |
transactions_out = ( | |
pl.when(pl.col("available_stands") - available_stands_shift < 0) | |
.then(0) | |
.otherwise(pl.col("available_stands") - available_stands_shift) | |
.alias("transactions_out") | |
) | |
# Appliquer toutes les transformations dans un seul appel | |
data = data.with_columns(available_stands_shift.alias("available_stands_shift"), transactions_out).drop( | |
"available_stands_shift" | |
) | |
return data | |
def fct_with_columns(data: pl.DataFrame) -> pl.DataFrame: | |
""" """ | |
data = data.with_columns(pl.col("available_stands").shift(1).over("station_id").alias("available_stands_shift")) | |
data = data.with_columns(pl.col("available_stands_shift").fill_null(pl.col("available_stands"))) | |
data = data.with_columns(transactions_out=(pl.col("available_stands") - pl.col("available_stands_shift"))) | |
data = data.with_columns( | |
transactions_out=pl.when(pl.col("transactions_out") < 0).then(0).otherwise(pl.col("transactions_out")) | |
) | |
# Drop non useful column | |
data = data.drop("available_stands_shift") | |
return data | |
######################################## | |
# To run benchmark easily | |
######################################## | |
def run_fct_with_columns_eager(df_eager): | |
df_eager.with_columns(fct_with_columns(df_eager)) | |
def run_fct_with_columns_lazy(df_lazy): | |
fct_with_columns(df_lazy).collect() | |
def run_fct_with_columns_opti_eager(df_eager): | |
df_eager.with_columns(fct_with_columns_opti(df_eager)) | |
def run_fct_with_columns_opti_lazy(df_lazy): | |
fct_with_columns_opti(df_lazy).collect() | |
def measure_execution_time(df_lazy, df_eager): | |
times = { | |
"run_fct_with_columns_eager": [], | |
"run_fct_with_columns_lazy": [], | |
"run_fct_with_columns_opti_eager": [], | |
"run_fct_with_columns_opti_lazy": [], | |
} | |
for _ in range(10): | |
start_time = perf_counter() | |
run_fct_with_columns_eager(df_eager) | |
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_with_columns_lazy(df_lazy) | |
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_with_columns_opti_eager(df_eager) | |
times["run_fct_with_columns_opti_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_with_columns_opti_lazy(df_lazy) | |
times["run_fct_with_columns_opti_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
# Calculate the average execution times | |
avg_times = {key: sum(value) / len(value) for key, value in times.items()} | |
return avg_times | |
######################################## | |
# Process of benchmark | |
######################################## | |
# Read data and create a bigger df | |
df_lazy = read_df() # (5630, 8) | |
df_eager = df_lazy.collect() # (5630, 8) | |
df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8) | |
# List to store execution times | |
execution_times = [] | |
segment_length = 5630 | |
num_segments = df_big_lazy.collect().height // segment_length | |
# Apply functions on each segment and measure execution time | |
for i in range(1, num_segments + 1): | |
current_length = i * segment_length | |
df_lazy = df_big_lazy.slice(0, current_length) | |
df_eager = df_lazy.collect() | |
times = measure_execution_time(df_lazy, df_eager) | |
times["length"] = df_eager.shape[0] | |
execution_times.append(times) | |
# Convert execution times to DataFrame | |
df_times = pl.DataFrame(execution_times) | |
# Plot the results using matplotlib | |
import matplotlib.pyplot as plt | |
plt.figure(figsize=(10, 6)) | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager") | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy") | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_opti_eager"], label="run_fct_with_columns_opti_eager") | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_opti_lazy"], label="run_fct_with_columns_opti_lazy") | |
plt.title("Execution Time vs Length of DataFrame") | |
plt.xlabel("Length of DataFrame") | |
plt.ylabel("Execution Time (milliseconds)") | |
plt.legend(title="Function") | |
plt.grid(True) | |
plt.savefig("execution_time_vs_length_3.png") | |
plt.close() |
Author
armgilles
commented
Oct 15, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment