Skip to content

Instantly share code, notes, and snippets.

@ishank-dev
Created February 25, 2025 06:20
Show Gist options
  • Save ishank-dev/5aa31ab1fe7e2c63c3603409b1ef9283 to your computer and use it in GitHub Desktop.
Save ishank-dev/5aa31ab1fe7e2c63c3603409b1ef9283 to your computer and use it in GitHub Desktop.
Connection Pool Simulation FB
"""
simulate_firebolt_pooling_varied.py
This script performs the following steps:
1. Runs a dummy query (SELECT 1) to warm up the Firebolt engine.
2. Runs two simulations (each for 10 seconds):
- WITHOUT connection pooling: A new connection is created for every query.
- WITH connection pooling: A pre-warmed pool of connections is used.
For each query, the script cycles through different severity filters (using numeric values)
so that the queries are similar but not identical.
It logs per-iteration connection time and query time, then prints a summary table.
"""
import os
import sys
import time
import logging
import statistics
import queue
from shapely import wkt
from shapely.geometry import mapping
import osmnx as ox
from firebolt.client.auth import ClientCredentials
from firebolt.db import connect
# -----------------------------
# Logging Configuration
# -----------------------------
logger = logging.getLogger("SimulateBenchmarkVaried")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
# -----------------------------
# Firebolt Credentials
# -----------------------------
client_id = os.getenv("FIREBOLT_CLIENT_ID", "")
client_secret = os.getenv("FIREBOLT_CLIENT_SECRET", "")
engine_name = os.getenv("FIREBOLT_ENGINE_NAME", "")
database_name = os.getenv("FIREBOLT_DATABASE", "")
account_name = os.getenv("FIREBOLT_ACCOUNT", "")
if not account_name:
logger.error("FIREBOLT_ACCOUNT environment variable is not set!")
raise Exception("FIREBOLT_ACCOUNT environment variable must be set!")
location_polygons = {}
# -----------------------------
# Connection & Query Functions
# -----------------------------
def connect_to_firebolt():
credentials = ClientCredentials(client_id=client_id, client_secret=client_secret)
connection = connect(
engine_name=engine_name,
database=database_name,
account_name=account_name,
auth=credentials,
)
return connection
def build_query(location, severity, start_date, end_date):
"""
Build an SQL query for the accidentdata table.
It uses OSMnx to geocode the location if not already cached.
"""
polygon_wkt = location_polygons.get(location)
if not polygon_wkt:
try:
gdf = ox.geocode_to_gdf(location)
except Exception as e:
logger.error(f"Geocoding error for '{location}': {str(e)}")
raise Exception(f"Geocoding error: {str(e)}")
if gdf.empty:
raise Exception(f"Location '{location}' not found")
geom = gdf.iloc[0].geometry
if geom.geom_type == "Point":
geom = geom.buffer(0.01)
polygon_wkt = geom.wkt
query = f"""
SELECT DISTINCT
id,
severity,
start_time,
description,
weather_condition,
distance_mi,
ST_AsText(start_location) AS location_wkt
FROM accidentdata
WHERE ST_Contains(
ST_GeogFromText('{polygon_wkt}'),
start_location
)
"""
if severity:
# Assuming severity is a numeric column (bigint), we do not quote numeric values.
query += f"\nAND severity = {severity}"
if start_date:
query += f"\nAND to_date(start_time) >= '{start_date}'"
if end_date:
query += f"\nAND to_date(start_time) <= '{end_date}'"
query += ";"
return query
def run_query(query, connection):
cursor = connection.cursor()
query_start = time.time()
cursor.execute(query)
_ = cursor.fetchall()
query_end = time.time()
return query_end - query_start
# -----------------------------
# Connection Pool Class
# -----------------------------
class FireboltConnectionPool:
def __init__(self, pool_size):
self.pool = queue.Queue(maxsize=pool_size)
self.pool_size = pool_size
self.initialize_pool()
def initialize_pool(self):
for _ in range(self.pool_size):
conn = connect_to_firebolt()
self.pool.put(conn)
def get_connection(self, timeout=None):
try:
return self.pool.get(timeout=timeout)
except queue.Empty:
raise Exception("No available connection in the pool.")
def return_connection(self, conn):
try:
self.pool.put(conn, block=False)
except queue.Full:
conn.close()
def close_all(self):
while not self.pool.empty():
conn = self.pool.get()
conn.close()
# -----------------------------
# Simulation Functions
# -----------------------------
def simulate_without_pooling(sim_duration, location, severity_list, start_date, end_date):
query_count = 0
connection_times = []
query_times = []
sim_start = time.time()
while time.time() - sim_start < sim_duration:
# Cycle through severity filters (numeric values, e.g., "", "1", "2", "3")
current_severity = severity_list[query_count % len(severity_list)]
query = build_query(location, current_severity, start_date, end_date)
try:
conn_start = time.time()
conn = connect_to_firebolt()
conn_elapsed = time.time() - conn_start
qt = run_query(query, conn)
conn.close()
connection_times.append(conn_elapsed)
query_times.append(qt)
query_count += 1
logger.info(f"[No Pool] Iteration {query_count} (Severity='{current_severity}'): Connection Time = {conn_elapsed:.4f}s, Query Time = {qt:.4f}s")
except Exception as e:
logger.error(f"Error in simulate_without_pooling: {e}")
total_duration = time.time() - sim_start
return query_count, total_duration, connection_times, query_times
def simulate_with_pooling(sim_duration, location, severity_list, start_date, end_date, pool_size=5):
query_count = 0
connection_times = []
query_times = []
pool = FireboltConnectionPool(pool_size)
sim_start = time.time()
while time.time() - sim_start < sim_duration:
current_severity = severity_list[query_count % len(severity_list)]
query = build_query(location, current_severity, start_date, end_date)
try:
conn_start = time.time()
conn = pool.get_connection(timeout=10)
conn_elapsed = time.time() - conn_start
qt = run_query(query, conn)
pool.return_connection(conn)
connection_times.append(conn_elapsed)
query_times.append(qt)
query_count += 1
logger.info(f"[Pool] Iteration {query_count} (Severity='{current_severity}'): Connection Acquire Time = {conn_elapsed:.4f}s, Query Time = {qt:.4f}s")
except Exception as e:
logger.error(f"Error in simulate_with_pooling: {e}")
total_duration = time.time() - sim_start
pool.close_all()
return query_count, total_duration, connection_times, query_times
def print_results_table(results):
headers = [
"Scenario",
"Total Queries",
"Total Duration (s)",
"Avg Conn Time (s)",
"Avg Query Time (s)",
"Throughput (qps)"
]
print("{:<30} {:<15} {:<20} {:<20} {:<20} {:<15}".format(*headers))
for scenario, count, tot_time, avg_conn, avg_query, throughput in results:
print("{:<30} {:<15} {:<20.4f} {:<20.4f} {:<20.4f} {:<15.2f}".format(
scenario, count, tot_time, avg_conn, avg_query, throughput))
def main():
# Warm-up: run a dummy query to warm up the engine.
logger.info("Running dummy query to warm up the engine...")
try:
conn = connect_to_firebolt()
cursor = conn.cursor()
cursor.execute("SELECT 1;")
_ = cursor.fetchall()
conn.close()
logger.info("Dummy query completed, engine is warmed up.")
except Exception as e:
logger.error(f"Dummy query failed: {e}")
sys.exit(1)
SIM_DURATION = 10 # seconds per simulation
location = "Long Beach"
# Using numeric severity filters. Empty string means no filter.
severity_list = ["", "1", "2", "3"]
start_date = ""
end_date = ""
logger.info("Simulating WITHOUT connection pooling...")
count_no_pool, tot_time_no_pool, conn_times_no_pool, query_times_no_pool = simulate_without_pooling(
SIM_DURATION, location, severity_list, start_date, end_date)
avg_conn_no_pool = statistics.mean(conn_times_no_pool) if conn_times_no_pool else 0
avg_query_no_pool = statistics.mean(query_times_no_pool) if query_times_no_pool else 0
throughput_no_pool = count_no_pool / tot_time_no_pool if tot_time_no_pool > 0 else 0
logger.info("Simulating WITH connection pooling...")
count_pool, tot_time_pool, conn_times_pool, query_times_pool = simulate_with_pooling(
SIM_DURATION, location, severity_list, start_date, end_date, pool_size=5)
avg_conn_pool = statistics.mean(conn_times_pool) if conn_times_pool else 0
avg_query_pool = statistics.mean(query_times_pool) if query_times_pool else 0
throughput_pool = count_pool / tot_time_pool if tot_time_pool > 0 else 0
results = [
("Without Connection Pool", count_no_pool, tot_time_no_pool, avg_conn_no_pool, avg_query_no_pool, throughput_no_pool),
("With Connection Pool", count_pool, tot_time_pool, avg_conn_pool, avg_query_pool, throughput_pool)
]
print("\nSimulation Results (each run lasted ~10 seconds):")
print_results_table(results)
if __name__ == "__main__":
main()
```
LOGS:
2025-02-24 22:09:47,538 - INFO - Running dummy query to warm up the engine...
2025-02-24 22:09:57,690 - INFO - Dummy query completed, engine is warmed up.
2025-02-24 22:09:57,690 - INFO - Simulating WITHOUT connection pooling...
2025-02-24 22:10:01,107 - INFO - [No Pool] Iteration 1 (Severity=''): Connection Time = 0.4323s, Query Time = 2.9575s
2025-02-24 22:10:02,245 - INFO - [No Pool] Iteration 2 (Severity='1'): Connection Time = 0.4382s, Query Time = 0.6978s
2025-02-24 22:10:04,374 - INFO - [No Pool] Iteration 3 (Severity='2'): Connection Time = 0.4564s, Query Time = 1.6625s
2025-02-24 22:10:06,109 - INFO - [No Pool] Iteration 4 (Severity='3'): Connection Time = 0.4054s, Query Time = 1.3266s
2025-02-24 22:10:08,493 - INFO - [No Pool] Iteration 5 (Severity=''): Connection Time = 0.4512s, Query Time = 1.9287s
2025-02-24 22:10:08,494 - INFO - Simulating WITH connection pooling...
2025-02-24 22:10:12,765 - INFO - [Pool] Iteration 1 (Severity=''): Connection Acquire Time = 0.0000s, Query Time = 2.0223s
2025-02-24 22:10:13,041 - INFO - [Pool] Iteration 2 (Severity='1'): Connection Acquire Time = 0.0000s, Query Time = 0.2738s
2025-02-24 22:10:15,186 - INFO - [Pool] Iteration 3 (Severity='2'): Connection Acquire Time = 0.0000s, Query Time = 2.1386s
2025-02-24 22:10:16,656 - INFO - [Pool] Iteration 4 (Severity='3'): Connection Acquire Time = 0.0000s, Query Time = 1.4673s
2025-02-24 22:10:18,355 - INFO - [Pool] Iteration 5 (Severity=''): Connection Acquire Time = 0.0000s, Query Time = 1.6968s
2025-02-24 22:10:18,641 - INFO - [Pool] Iteration 6 (Severity='1'): Connection Acquire Time = 0.0000s, Query Time = 0.2834s
2025-02-24 22:10:20,306 - INFO - [Pool] Iteration 7 (Severity='2'): Connection Acquire Time = 0.0000s, Query Time = 1.6573s
2025-02-24 22:10:22,027 - INFO - [Pool] Iteration 8 (Severity='3'): Connection Acquire Time = 0.0000s, Query Time = 1.7193s
Simulation Results (each run lasted ~10 seconds):
Scenario Total Queries Total Duration (s) Avg Conn Time (s) Avg Query Time (s) Throughput (qps)
Without Connection Pool 5 10.8025 0.4367 1.7146 0.46
With Connection Pool 8 11.2897 0.0000 1.4074 0.71
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment