Created
February 25, 2025 06:20
-
-
Save ishank-dev/5aa31ab1fe7e2c63c3603409b1ef9283 to your computer and use it in GitHub Desktop.
Connection Pool Simulation FB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
simulate_firebolt_pooling_varied.py | |
This script performs the following steps: | |
1. Runs a dummy query (SELECT 1) to warm up the Firebolt engine. | |
2. Runs two simulations (each for 10 seconds): | |
- WITHOUT connection pooling: A new connection is created for every query. | |
- WITH connection pooling: A pre-warmed pool of connections is used. | |
For each query, the script cycles through different severity filters (using numeric values) | |
so that the queries are similar but not identical. | |
It logs per-iteration connection time and query time, then prints a summary table. | |
""" | |
import os | |
import sys | |
import time | |
import logging | |
import statistics | |
import queue | |
from shapely import wkt | |
from shapely.geometry import mapping | |
import osmnx as ox | |
from firebolt.client.auth import ClientCredentials | |
from firebolt.db import connect | |
# ----------------------------- | |
# Logging Configuration | |
# ----------------------------- | |
logger = logging.getLogger("SimulateBenchmarkVaried") | |
logger.setLevel(logging.INFO) | |
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") | |
ch = logging.StreamHandler(sys.stdout) | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
# ----------------------------- | |
# Firebolt Credentials | |
# ----------------------------- | |
client_id = os.getenv("FIREBOLT_CLIENT_ID", "") | |
client_secret = os.getenv("FIREBOLT_CLIENT_SECRET", "") | |
engine_name = os.getenv("FIREBOLT_ENGINE_NAME", "") | |
database_name = os.getenv("FIREBOLT_DATABASE", "") | |
account_name = os.getenv("FIREBOLT_ACCOUNT", "") | |
if not account_name: | |
logger.error("FIREBOLT_ACCOUNT environment variable is not set!") | |
raise Exception("FIREBOLT_ACCOUNT environment variable must be set!") | |
location_polygons = {} | |
# ----------------------------- | |
# Connection & Query Functions | |
# ----------------------------- | |
def connect_to_firebolt(): | |
credentials = ClientCredentials(client_id=client_id, client_secret=client_secret) | |
connection = connect( | |
engine_name=engine_name, | |
database=database_name, | |
account_name=account_name, | |
auth=credentials, | |
) | |
return connection | |
def build_query(location, severity, start_date, end_date): | |
""" | |
Build an SQL query for the accidentdata table. | |
It uses OSMnx to geocode the location if not already cached. | |
""" | |
polygon_wkt = location_polygons.get(location) | |
if not polygon_wkt: | |
try: | |
gdf = ox.geocode_to_gdf(location) | |
except Exception as e: | |
logger.error(f"Geocoding error for '{location}': {str(e)}") | |
raise Exception(f"Geocoding error: {str(e)}") | |
if gdf.empty: | |
raise Exception(f"Location '{location}' not found") | |
geom = gdf.iloc[0].geometry | |
if geom.geom_type == "Point": | |
geom = geom.buffer(0.01) | |
polygon_wkt = geom.wkt | |
query = f""" | |
SELECT DISTINCT | |
id, | |
severity, | |
start_time, | |
description, | |
weather_condition, | |
distance_mi, | |
ST_AsText(start_location) AS location_wkt | |
FROM accidentdata | |
WHERE ST_Contains( | |
ST_GeogFromText('{polygon_wkt}'), | |
start_location | |
) | |
""" | |
if severity: | |
# Assuming severity is a numeric column (bigint), we do not quote numeric values. | |
query += f"\nAND severity = {severity}" | |
if start_date: | |
query += f"\nAND to_date(start_time) >= '{start_date}'" | |
if end_date: | |
query += f"\nAND to_date(start_time) <= '{end_date}'" | |
query += ";" | |
return query | |
def run_query(query, connection): | |
cursor = connection.cursor() | |
query_start = time.time() | |
cursor.execute(query) | |
_ = cursor.fetchall() | |
query_end = time.time() | |
return query_end - query_start | |
# ----------------------------- | |
# Connection Pool Class | |
# ----------------------------- | |
class FireboltConnectionPool: | |
def __init__(self, pool_size): | |
self.pool = queue.Queue(maxsize=pool_size) | |
self.pool_size = pool_size | |
self.initialize_pool() | |
def initialize_pool(self): | |
for _ in range(self.pool_size): | |
conn = connect_to_firebolt() | |
self.pool.put(conn) | |
def get_connection(self, timeout=None): | |
try: | |
return self.pool.get(timeout=timeout) | |
except queue.Empty: | |
raise Exception("No available connection in the pool.") | |
def return_connection(self, conn): | |
try: | |
self.pool.put(conn, block=False) | |
except queue.Full: | |
conn.close() | |
def close_all(self): | |
while not self.pool.empty(): | |
conn = self.pool.get() | |
conn.close() | |
# ----------------------------- | |
# Simulation Functions | |
# ----------------------------- | |
def simulate_without_pooling(sim_duration, location, severity_list, start_date, end_date): | |
query_count = 0 | |
connection_times = [] | |
query_times = [] | |
sim_start = time.time() | |
while time.time() - sim_start < sim_duration: | |
# Cycle through severity filters (numeric values, e.g., "", "1", "2", "3") | |
current_severity = severity_list[query_count % len(severity_list)] | |
query = build_query(location, current_severity, start_date, end_date) | |
try: | |
conn_start = time.time() | |
conn = connect_to_firebolt() | |
conn_elapsed = time.time() - conn_start | |
qt = run_query(query, conn) | |
conn.close() | |
connection_times.append(conn_elapsed) | |
query_times.append(qt) | |
query_count += 1 | |
logger.info(f"[No Pool] Iteration {query_count} (Severity='{current_severity}'): Connection Time = {conn_elapsed:.4f}s, Query Time = {qt:.4f}s") | |
except Exception as e: | |
logger.error(f"Error in simulate_without_pooling: {e}") | |
total_duration = time.time() - sim_start | |
return query_count, total_duration, connection_times, query_times | |
def simulate_with_pooling(sim_duration, location, severity_list, start_date, end_date, pool_size=5): | |
query_count = 0 | |
connection_times = [] | |
query_times = [] | |
pool = FireboltConnectionPool(pool_size) | |
sim_start = time.time() | |
while time.time() - sim_start < sim_duration: | |
current_severity = severity_list[query_count % len(severity_list)] | |
query = build_query(location, current_severity, start_date, end_date) | |
try: | |
conn_start = time.time() | |
conn = pool.get_connection(timeout=10) | |
conn_elapsed = time.time() - conn_start | |
qt = run_query(query, conn) | |
pool.return_connection(conn) | |
connection_times.append(conn_elapsed) | |
query_times.append(qt) | |
query_count += 1 | |
logger.info(f"[Pool] Iteration {query_count} (Severity='{current_severity}'): Connection Acquire Time = {conn_elapsed:.4f}s, Query Time = {qt:.4f}s") | |
except Exception as e: | |
logger.error(f"Error in simulate_with_pooling: {e}") | |
total_duration = time.time() - sim_start | |
pool.close_all() | |
return query_count, total_duration, connection_times, query_times | |
def print_results_table(results): | |
headers = [ | |
"Scenario", | |
"Total Queries", | |
"Total Duration (s)", | |
"Avg Conn Time (s)", | |
"Avg Query Time (s)", | |
"Throughput (qps)" | |
] | |
print("{:<30} {:<15} {:<20} {:<20} {:<20} {:<15}".format(*headers)) | |
for scenario, count, tot_time, avg_conn, avg_query, throughput in results: | |
print("{:<30} {:<15} {:<20.4f} {:<20.4f} {:<20.4f} {:<15.2f}".format( | |
scenario, count, tot_time, avg_conn, avg_query, throughput)) | |
def main(): | |
# Warm-up: run a dummy query to warm up the engine. | |
logger.info("Running dummy query to warm up the engine...") | |
try: | |
conn = connect_to_firebolt() | |
cursor = conn.cursor() | |
cursor.execute("SELECT 1;") | |
_ = cursor.fetchall() | |
conn.close() | |
logger.info("Dummy query completed, engine is warmed up.") | |
except Exception as e: | |
logger.error(f"Dummy query failed: {e}") | |
sys.exit(1) | |
SIM_DURATION = 10 # seconds per simulation | |
location = "Long Beach" | |
# Using numeric severity filters. Empty string means no filter. | |
severity_list = ["", "1", "2", "3"] | |
start_date = "" | |
end_date = "" | |
logger.info("Simulating WITHOUT connection pooling...") | |
count_no_pool, tot_time_no_pool, conn_times_no_pool, query_times_no_pool = simulate_without_pooling( | |
SIM_DURATION, location, severity_list, start_date, end_date) | |
avg_conn_no_pool = statistics.mean(conn_times_no_pool) if conn_times_no_pool else 0 | |
avg_query_no_pool = statistics.mean(query_times_no_pool) if query_times_no_pool else 0 | |
throughput_no_pool = count_no_pool / tot_time_no_pool if tot_time_no_pool > 0 else 0 | |
logger.info("Simulating WITH connection pooling...") | |
count_pool, tot_time_pool, conn_times_pool, query_times_pool = simulate_with_pooling( | |
SIM_DURATION, location, severity_list, start_date, end_date, pool_size=5) | |
avg_conn_pool = statistics.mean(conn_times_pool) if conn_times_pool else 0 | |
avg_query_pool = statistics.mean(query_times_pool) if query_times_pool else 0 | |
throughput_pool = count_pool / tot_time_pool if tot_time_pool > 0 else 0 | |
results = [ | |
("Without Connection Pool", count_no_pool, tot_time_no_pool, avg_conn_no_pool, avg_query_no_pool, throughput_no_pool), | |
("With Connection Pool", count_pool, tot_time_pool, avg_conn_pool, avg_query_pool, throughput_pool) | |
] | |
print("\nSimulation Results (each run lasted ~10 seconds):") | |
print_results_table(results) | |
if __name__ == "__main__": | |
main() | |
``` | |
LOGS: | |
2025-02-24 22:09:47,538 - INFO - Running dummy query to warm up the engine... | |
2025-02-24 22:09:57,690 - INFO - Dummy query completed, engine is warmed up. | |
2025-02-24 22:09:57,690 - INFO - Simulating WITHOUT connection pooling... | |
2025-02-24 22:10:01,107 - INFO - [No Pool] Iteration 1 (Severity=''): Connection Time = 0.4323s, Query Time = 2.9575s | |
2025-02-24 22:10:02,245 - INFO - [No Pool] Iteration 2 (Severity='1'): Connection Time = 0.4382s, Query Time = 0.6978s | |
2025-02-24 22:10:04,374 - INFO - [No Pool] Iteration 3 (Severity='2'): Connection Time = 0.4564s, Query Time = 1.6625s | |
2025-02-24 22:10:06,109 - INFO - [No Pool] Iteration 4 (Severity='3'): Connection Time = 0.4054s, Query Time = 1.3266s | |
2025-02-24 22:10:08,493 - INFO - [No Pool] Iteration 5 (Severity=''): Connection Time = 0.4512s, Query Time = 1.9287s | |
2025-02-24 22:10:08,494 - INFO - Simulating WITH connection pooling... | |
2025-02-24 22:10:12,765 - INFO - [Pool] Iteration 1 (Severity=''): Connection Acquire Time = 0.0000s, Query Time = 2.0223s | |
2025-02-24 22:10:13,041 - INFO - [Pool] Iteration 2 (Severity='1'): Connection Acquire Time = 0.0000s, Query Time = 0.2738s | |
2025-02-24 22:10:15,186 - INFO - [Pool] Iteration 3 (Severity='2'): Connection Acquire Time = 0.0000s, Query Time = 2.1386s | |
2025-02-24 22:10:16,656 - INFO - [Pool] Iteration 4 (Severity='3'): Connection Acquire Time = 0.0000s, Query Time = 1.4673s | |
2025-02-24 22:10:18,355 - INFO - [Pool] Iteration 5 (Severity=''): Connection Acquire Time = 0.0000s, Query Time = 1.6968s | |
2025-02-24 22:10:18,641 - INFO - [Pool] Iteration 6 (Severity='1'): Connection Acquire Time = 0.0000s, Query Time = 0.2834s | |
2025-02-24 22:10:20,306 - INFO - [Pool] Iteration 7 (Severity='2'): Connection Acquire Time = 0.0000s, Query Time = 1.6573s | |
2025-02-24 22:10:22,027 - INFO - [Pool] Iteration 8 (Severity='3'): Connection Acquire Time = 0.0000s, Query Time = 1.7193s | |
Simulation Results (each run lasted ~10 seconds): | |
Scenario Total Queries Total Duration (s) Avg Conn Time (s) Avg Query Time (s) Throughput (qps) | |
Without Connection Pool 5 10.8025 0.4367 1.7146 0.46 | |
With Connection Pool 8 11.2897 0.0000 1.4074 0.71 | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment