Skip to content

Instantly share code, notes, and snippets.

@armgilles
Last active October 15, 2024 07:22
Show Gist options
  • Save armgilles/908bc6d6fcb5e514521e62b0216131be to your computer and use it in GitHub Desktop.
Save armgilles/908bc6d6fcb5e514521e62b0216131be to your computer and use it in GitHub Desktop.
Utilisation d'expr dans with_coloumns pour optimiser mais résultats moins bon.
from time import perf_counter
import matplotlib.pyplot as plt
import pandas as pd
import polars as pl
########################################
# Functions to read and create DataFrames
########################################
def read_df() -> pl.LazyFrame:
""""""
column_dtypes = {
"gid": pl.UInt8,
"ident": pl.UInt8,
"type": pl.Categorical,
"name": pl.Utf8,
"state": pl.String,
"available_stands": pl.Int8,
"available_bikes": pl.Int8,
}
state_dict = {"CONNECTEE": 1, "DECONNECTEE": 0}
url = "https://raw.githubusercontent.com/armgilles/vcub_keeper/refs/heads/master/tests/data_for_tests/activite_data.csv"
activite = pl.scan_csv(url, schema_overrides=column_dtypes, try_parse_dates=True)
activite = activite.with_columns(pl.col("state").replace(state_dict))
# Renaming columns
activite = activite.rename({"ident": "station_id", "ts": "date"})
# Sorting DataFrame on station_id & date
activite = activite.sort(["station_id", "date"])
return activite
def create_df_big(activite_data: pd.DataFrame) -> pd.DataFrame:
"""Increase the number of rows of the DataFrame"""
# Mapping new station_id to easily increase the number of rows
new_station_id_dict = {22: 1, 43: 100, 102: 200, 106: 300, 123: 400}
activite_data["station_id"] = activite_data["station_id"].map(new_station_id_dict)
n = 99
activite_data_big = activite_data.copy()
# Concatenate the DataFrame n times
for i in range(n):
temp = activite_data.copy()
temp["station_id"] = temp["station_id"] + i
activite_data_big = pd.concat([activite_data_big, temp], ignore_index=True)
return activite_data_big
########################################
# Calculation functions (pandas, polars eager
# and Expr)
########################################
def fct_with_columns_opti(data: pl.DataFrame) -> pl.DataFrame:
""" """
available_stands_shift = (
pl.col("available_stands").shift(1).over("station_id").fill_null(pl.col("available_stands"))
)
transactions_out = (
pl.when(pl.col("available_stands") - available_stands_shift < 0)
.then(0)
.otherwise(pl.col("available_stands") - available_stands_shift)
.alias("transactions_out")
)
# Appliquer toutes les transformations dans un seul appel
data = data.with_columns(available_stands_shift.alias("available_stands_shift"), transactions_out).drop(
"available_stands_shift"
)
return data
def fct_with_columns(data: pl.DataFrame) -> pl.DataFrame:
""" """
data = data.with_columns(pl.col("available_stands").shift(1).over("station_id").alias("available_stands_shift"))
data = data.with_columns(pl.col("available_stands_shift").fill_null(pl.col("available_stands")))
data = data.with_columns(transactions_out=(pl.col("available_stands") - pl.col("available_stands_shift")))
data = data.with_columns(
transactions_out=pl.when(pl.col("transactions_out") < 0).then(0).otherwise(pl.col("transactions_out"))
)
# Drop non useful column
data = data.drop("available_stands_shift")
return data
########################################
# To run benchmark easily
########################################
def run_fct_with_columns_eager(df_eager):
df_eager.with_columns(fct_with_columns(df_eager))
def run_fct_with_columns_lazy(df_lazy):
fct_with_columns(df_lazy).collect()
def run_fct_with_columns_opti_eager(df_eager):
df_eager.with_columns(fct_with_columns_opti(df_eager))
def run_fct_with_columns_opti_lazy(df_lazy):
fct_with_columns_opti(df_lazy).collect()
def measure_execution_time(df_lazy, df_eager):
times = {
"run_fct_with_columns_eager": [],
"run_fct_with_columns_lazy": [],
"run_fct_with_columns_opti_eager": [],
"run_fct_with_columns_opti_lazy": [],
}
for _ in range(10):
start_time = perf_counter()
run_fct_with_columns_eager(df_eager)
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_with_columns_lazy(df_lazy)
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_with_columns_opti_eager(df_eager)
times["run_fct_with_columns_opti_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_with_columns_opti_lazy(df_lazy)
times["run_fct_with_columns_opti_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
# Calculate the average execution times
avg_times = {key: sum(value) / len(value) for key, value in times.items()}
return avg_times
########################################
# Process of benchmark
########################################
# Read data and create a bigger df
df_lazy = read_df() # (5630, 8)
df_eager = df_lazy.collect() # (5630, 8)
df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8)
# List to store execution times
execution_times = []
segment_length = 5630
num_segments = df_big_lazy.collect().height // segment_length
# Apply functions on each segment and measure execution time
for i in range(1, num_segments + 1):
current_length = i * segment_length
df_lazy = df_big_lazy.slice(0, current_length)
df_eager = df_lazy.collect()
times = measure_execution_time(df_lazy, df_eager)
times["length"] = df_eager.shape[0]
execution_times.append(times)
# Convert execution times to DataFrame
df_times = pl.DataFrame(execution_times)
# Plot the results using matplotlib
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager")
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy")
plt.plot(df_times["length"], df_times["run_fct_with_columns_opti_eager"], label="run_fct_with_columns_opti_eager")
plt.plot(df_times["length"], df_times["run_fct_with_columns_opti_lazy"], label="run_fct_with_columns_opti_lazy")
plt.title("Execution Time vs Length of DataFrame")
plt.xlabel("Length of DataFrame")
plt.ylabel("Execution Time (milliseconds)")
plt.legend(title="Function")
plt.grid(True)
plt.savefig("execution_time_vs_length_3.png")
plt.close()
@armgilles
Copy link
Author

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment