Skip to content

Instantly share code, notes, and snippets.

@armgilles
Last active October 18, 2024 08:22
Show Gist options
  • Save armgilles/32af40325fa8823975d801d9c1f0fc55 to your computer and use it in GitHub Desktop.
Save armgilles/32af40325fa8823975d801d9c1f0fc55 to your computer and use it in GitHub Desktop.
Nouveau bench avec prediction filer optimisation en lazy. With_columns or Expr are egal with lazy
import random
from datetime import datetime, timedelta
from time import perf_counter
import matplotlib.pyplot as plt
import polars as pl
########################################
# Functions to read and create DataFrames
########################################
# Générateur d'adresses IP simulées
def generate_ip():
return ".".join(str(random.randint(0, 255)) for _ in range(4))
# Générateur de codes HTTP simulés
def generate_status_code():
return random.choices([200, 404, 500], weights=[0.8, 0.15, 0.05])[0] # Rarement 500
# Générateur de temps de réponse
def generate_response_time():
return random.randint(100, 2000) # Temps de réponse entre 100ms et 2000ms
# Générateur de timestamp
def generate_timestamp(start_date, days_range=10):
random_days = random.randint(0, days_range)
random_seconds = random.randint(0, 86400) # secondes dans un jour
return start_date + timedelta(days=random_days, seconds=random_seconds)
# Simuler des pays associés aux adresses IP
def generate_country():
countries = ["USA", "France", "Germany", "Canada", "Brazil"]
return random.choice(countries)
# Générer un dataset simulé
def generate_log_data(num_rows=1000):
start_date = datetime(2024, 1, 1)
data = {
"ip_address": [generate_ip() for _ in range(num_rows)],
"status_code": [generate_status_code() for _ in range(num_rows)],
"response_time": [generate_response_time() for _ in range(num_rows)],
"timestamp": [generate_timestamp(start_date) for _ in range(num_rows)],
"country": [generate_country() for _ in range(num_rows)],
}
return pl.DataFrame(data)
########################################
# Calculation functions (pandas, polars eager
# and Expr)
########################################
# 1. Filtrer les erreurs HTTP 500
def filter_errors_500_w(df: pl.LazyFrame) -> pl.LazyFrame:
return df.filter(pl.col("status_code") == 500)
# 2. Calculer des agrégations : nombre de requêtes et moyenne des temps de réponse par IP
def compute_aggregations_w(df: pl.LazyFrame) -> pl.LazyFrame:
return df.group_by("ip_address").agg(
[pl.len().alias("request_count"), pl.col("response_time").mean().alias("avg_response_time")]
)
# 3. Ajouter une colonne "is_slow" : temps de réponse > 1500ms
def add_is_slow_w(df: pl.LazyFrame) -> pl.LazyFrame:
return df.with_columns((pl.col("avg_response_time") > 1500).alias("is_slow"))
# 4. Calculer un score pondéré basé sur le nombre de requêtes et le temps de réponse moyen
def add_weighted_score_w(df: pl.LazyFrame) -> pl.LazyFrame:
return df.with_columns((pl.col("request_count") * pl.col("avg_response_time") / 100).alias("weighted_score"))
# Fonction principale qui enchaîne toutes les transformations
def fct_with_columns(df: pl.LazyFrame) -> pl.DataFrame:
df = (
df.pipe(filter_errors_500_w) # Filtrer les erreurs HTTP 500
.pipe(compute_aggregations_w) # Calculer les agrégations (count, avg response_time)
.pipe(add_is_slow_w) # Ajouter la colonne "is_slow"
.pipe(add_weighted_score_w) # Ajouter le score pondéré
)
return df
def filter_errors_500() -> pl.Expr:
return pl.col("status_code") == 500
# 2. Calculer des agrégations : nombre de requêtes et moyenne des temps de réponse par IP
def compute_aggregations() -> list[pl.Expr]:
return [
pl.len().alias("request_count"), # Nombre de requêtes par IP
pl.col("response_time").mean().alias("avg_response_time"), # Moyenne des temps de réponse par IP
]
# 3. Ajouter une colonne "is_slow" : temps de réponse > 1500ms
def add_is_slow() -> pl.Expr:
return (pl.col("avg_response_time") > 1500).alias("is_slow")
# 4. Calculer un score pondéré basé sur le nombre de requêtes et le temps de réponse moyen
def add_weighted_score() -> pl.Expr:
return (pl.col("request_count") * pl.col("avg_response_time") / 100).alias("weighted_score")
# Fonction principale qui enchaîne toutes les expressions
def fct_expr(df: pl.LazyFrame) -> pl.LazyFrame:
return (
df
# 1. Filtrage via l'expression
.filter(filter_errors_500())
# 2. Grouper par "ip_address" et appliquer les agrégations via expressions
.group_by("ip_address")
.agg(compute_aggregations())
# 3. Appliquer d'autres expressions via with_columns (is_slow, weighted_score)
.with_columns(
[
add_is_slow(), # Ajouter la colonne "is_slow"
add_weighted_score(), # Ajouter le score pondéré
]
)
)
########################################
# To run benchmark easily
########################################
def run_fct_with_columns_eager(df_eager):
lol = fct_with_columns(df_eager)
def run_fct_with_columns_lazy(df_lazy):
fct_with_columns(df_lazy).collect()
def run_fct_eprx_eager(df_eager):
lol = fct_expr(df_eager)
def run_fct_eprx_lazy(df_lazy):
fct_expr(df_lazy).collect()
def measure_execution_time(df_lazy, df_eager):
times = {
"run_fct_with_columns_eager": [],
"run_fct_with_columns_lazy": [],
"run_fct_eprx_eager": [],
"run_fct_eprx_lazy": [],
}
for _ in range(10):
start_time = perf_counter()
run_fct_with_columns_eager(df_eager)
times["run_fct_with_columns_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_with_columns_lazy(df_lazy)
times["run_fct_with_columns_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_eprx_eager(df_eager)
times["run_fct_eprx_eager"].append((perf_counter() - start_time) * 1000) # Convert to ms
start_time = perf_counter()
run_fct_eprx_lazy(df_lazy)
times["run_fct_eprx_lazy"].append((perf_counter() - start_time) * 1000) # Convert to ms
# Calculate the average execution times
avg_times = {key: sum(value) / len(value) for key, value in times.items()}
return avg_times
########################################
# Process of benchmark
########################################
# Read data and create a bigger df
# df_lazy = read_df() # (5630, 8)
# df_eager = df_lazy.collect() # (5630, 8)
# df_big_lazy = pl.from_pandas(create_df_big(df_eager.to_pandas())).lazy() # (563000, 8)
df_big_lazy = generate_log_data(num_rows=1_000_000).lazy()
# List to store execution times
execution_times = []
segment_length = 500_000
num_segments = df_big_lazy.collect().height // segment_length
# Apply functions on each segment and measure execution time
for i in range(1, num_segments + 1):
current_length = i * segment_length
df_lazy = df_big_lazy.slice(0, current_length)
df_eager = df_lazy.collect()
times = measure_execution_time(df_lazy, df_eager)
times["length"] = df_eager.shape[0]
execution_times.append(times)
# Convert execution times to DataFrame
df_times = pl.DataFrame(execution_times)
# Plot the results using matplotlib and save the plot
plt.figure(figsize=(10, 6))
plt.plot(df_times["length"], df_times["run_fct_with_columns_eager"], label="run_fct_with_columns_eager")
plt.plot(df_times["length"], df_times["run_fct_with_columns_lazy"], label="run_fct_with_columns_lazy")
plt.plot(df_times["length"], df_times["run_fct_eprx_eager"], label="run_fct_eprx_eager")
plt.plot(df_times["length"], df_times["run_fct_eprx_lazy"], label="run_fct_eprx_lazy")
plt.title("Execution Time vs Length of DataFrame")
plt.xlabel("Length of DataFrame")
plt.ylabel("Execution Time (milliseconds)")
plt.legend(title="Function")
plt.grid(True)
# Save the plot
plt.savefig("execution_time_vs_length_4.png")
# Show the plot
plt.show()
@armgilles
Copy link
Author

image

@armgilles
Copy link
Author

armgilles commented Oct 16, 2024

with_columns() & Expr are the same in lazy mode.

image

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment