Last active
October 18, 2024 08:22
-
-
Save armgilles/32af40325fa8823975d801d9c1f0fc55 to your computer and use it in GitHub Desktop.
Nouveau bench avec prediction filer optimisation en lazy. With_columns or Expr are egal with lazy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
from datetime import datetime, timedelta | |
from time import perf_counter | |
import matplotlib.pyplot as plt | |
import polars as pl | |
######################################## | |
# Functions to read and create DataFrames | |
######################################## | |
# Générateur d'adresses IP simulées | |
def generate_ip(): | |
return ".".join(str(random.randint(0, 255)) for _ in range(4)) | |
# Générateur de codes HTTP simulés | |
def generate_status_code(): | |
return random.choices([200, 404, 500], weights=[0.8, 0.15, 0.05])[0] # Rarement 500 | |
# Générateur de temps de réponse | |
def generate_response_time(): | |
return random.randint(100, 2000) # Temps de réponse entre 100ms et 2000ms | |
# Générateur de timestamp | |
def generate_timestamp(start_date, days_range=10): | |
random_days = random.randint(0, days_range) | |
random_seconds = random.randint(0, 86400) # secondes dans un jour | |
return start_date + timedelta(days=random_days, seconds=random_seconds) | |
# Simuler des pays associés aux adresses IP | |
def generate_country(): | |
countries = ["USA", "France", "Germany", "Canada", "Brazil"] | |
return random.choice(countries) | |
# Générer un dataset simulé | |
def generate_log_data(num_rows=1000): | |
start_date = datetime(2024, 1, 1) | |
data = { | |
"ip_address": [generate_ip() for _ in range(num_rows)], | |
"status_code": [generate_status_code() for _ in range(num_rows)], | |
"response_time": [generate_response_time() for _ in range(num_rows)], | |
"timestamp": [generate_timestamp(start_date) for _ in range(num_rows)], | |
"country": [generate_country() for _ in range(num_rows)], | |
} | |
return pl.DataFrame(data) | |
######################################## | |
# Calculation functions (pandas, polars eager | |
# and Expr) | |
######################################## | |
# 1. Filtrer les erreurs HTTP 500 | |
def filter_errors_500_w(df: pl.LazyFrame) -> pl.LazyFrame: | |
return df.filter(pl.col("status_code") == 500) | |
# 2. Calculer des agrégations : nombre de requêtes et moyenne des temps de réponse par IP | |
def compute_aggregations_w(df: pl.LazyFrame) -> pl.LazyFrame: | |
return df.group_by("ip_address").agg( | |
[pl.len().alias("request_count"), pl.col("response_time").mean().alias("avg_response_time")] | |
) | |
# 3. Ajouter une colonne "is_slow" : temps de réponse > 1500ms | |
def add_is_slow_w(df: pl.LazyFrame) -> pl.LazyFrame: | |
return df.with_columns((pl.col("avg_response_time") > 1500).alias("is_slow")) | |
# 4. Calculer un score pondéré basé sur le nombre de requêtes et le temps de réponse moyen | |
def add_weighted_score_w(df: pl.LazyFrame) -> pl.LazyFrame: | |
return df.with_columns((pl.col("request_count") * pl.col("avg_response_time") / 100).alias("weighted_score")) | |
# Fonction principale qui enchaîne toutes les transformations | |
def fct_with_columns(df: pl.LazyFrame) -> pl.DataFrame: | |
df = ( | |
df.pipe(filter_errors_500_w) # Filtrer les erreurs HTTP 500 | |
.pipe(compute_aggregations_w) # Calculer les agrégations (count, avg response_time) | |
.pipe(add_is_slow_w) # Ajouter la colonne "is_slow" | |
.pipe(add_weighted_score_w) # Ajouter le score pondéré | |
) | |
return df | |
def filter_errors_500() -> pl.Expr: | |
return pl.col("status_code") == 500 | |
# 2. Calculer des agrégations : nombre de requêtes et moyenne des temps de réponse par IP | |
def compute_aggregations() -> list[pl.Expr]: | |
return [ | |
pl.len().alias("request_count"), # Nombre de requêtes par IP | |
pl.col("response_time").mean().alias("avg_response_time"), # Moyenne des temps de réponse par IP | |
] | |
# 3. Ajouter une colonne "is_slow" : temps de réponse > 1500ms | |
def add_is_slow() -> pl.Expr: | |
return (pl.col("avg_response_time") > 1500).alias("is_slow") | |
# 4. Calculer un score pondéré basé sur le nombre de requêtes et le temps de réponse moyen | |
def add_weighted_score() -> pl.Expr: | |
return (pl.col("request_count") * pl.col("avg_response_time") / 100).alias("weighted_score") | |
# Fonction principale qui enchaîne toutes les expressions | |
def fct_expr(df: pl.LazyFrame) -> pl.LazyFrame: | |
return ( | |
df | |
# 1. Filtrage via l'expression | |
.filter(filter_errors_500()) | |
# 2. Grouper par "ip_address" et appliquer les agrégations via expressions | |
.group_by("ip_address") | |
.agg(compute_aggregations()) | |
# 3. Appliquer d'autres expressions via with_columns (is_slow, weighted_score) | |
.with_columns( | |
[ | |
add_is_slow(), # Ajouter la colonne "is_slow" | |
add_weighted_score(), # Ajouter le score pondéré | |
] | |
) | |
) | |
######################################## | |
# To run benchmark easily | |
######################################## | |
def run_fct_with_columns_eager(df_eager): | |
lol = fct_with_columns(df_eager) | |
def run_fct_with_columns_lazy(df_lazy): | |
fct_with_columns(df_lazy).collect() | |
def run_fct_eprx_eager(df_eager): | |
lol = fct_expr(df_eager) | |
def run_fct_eprx_lazy(df_lazy): | |
fct_expr(df_lazy).collect() | |
def measure_execution_time(df_lazy, df_eager): | |
times = { | |
"run_fct_with_columns_eager": [], | |
"run_fct_with_columns_lazy": [], | |
"run_fct_eprx_eager": [], | |
"run_fct_eprx_lazy": [], | |
} | |
for _ in range(10): | |
start_time = perf_counter() | |
run_fct_with_columns_eager(df_eager) | |
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_with_columns_lazy(df_lazy) | |
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_eprx_eager(df_eager) | |
times["run_fct_eprx_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
start_time = perf_counter() | |
run_fct_eprx_lazy(df_lazy) | |
times["run_fct_eprx_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms | |
# Calculate the average execution times | |
avg_times = {key: sum(value) / len(value) for key, value in times.items()} | |
return avg_times | |
######################################## | |
# Process of benchmark | |
######################################## | |
# Read data and create a bigger df | |
# df_lazy = read_df() # (5630, 8) | |
# df_eager = df_lazy.collect() # (5630, 8) | |
# df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8) | |
df_big_lazy = generate_log_data(num_rows=1_000_000).lazy() | |
# List to store execution times | |
execution_times = [] | |
segment_length = 500_000 | |
num_segments = df_big_lazy.collect().height // segment_length | |
# Apply functions on each segment and measure execution time | |
for i in range(1, num_segments + 1): | |
current_length = i * segment_length | |
df_lazy = df_big_lazy.slice(0, current_length) | |
df_eager = df_lazy.collect() | |
times = measure_execution_time(df_lazy, df_eager) | |
times["length"] = df_eager.shape[0] | |
execution_times.append(times) | |
# Convert execution times to DataFrame | |
df_times = pl.DataFrame(execution_times) | |
# Plot the results using matplotlib and save the plot | |
plt.figure(figsize=(10, 6)) | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager") | |
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy") | |
plt.plot(df_times["length"], df_times["run_fct_eprx_eager"], label="run_fct_eprx_eager") | |
plt.plot(df_times["length"], df_times["run_fct_eprx_lazy"], label="run_fct_eprx_lazy") | |
plt.title("Execution Time vs Length of DataFrame") | |
plt.xlabel("Length of DataFrame") | |
plt.ylabel("Execution Time (milliseconds)") | |
plt.legend(title="Function") | |
plt.grid(True) | |
# Save the plot | |
plt.savefig("execution_time_vs_length_4.png") | |
# Show the plot | |
plt.show() |
Author
armgilles
commented
Oct 15, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment