Skip to content

Instantly share code, notes, and snippets.

@cwebber314
Created June 16, 2026 03:11
Show Gist options
  • Select an option

  • Save cwebber314/a827ee970a537572d9e4ff45fd2dc008 to your computer and use it in GitHub Desktop.

Select an option

Save cwebber314/a827ee970a537572d9e4ff45fd2dc008 to your computer and use it in GitHub Desktop.
Playing around with a threadpool to make I/O bound slow API calls run in parallel
"""Mock example: parallelizing slow API calls with ThreadPoolExecutor.
The real bottleneck is a slow API library (network I/O), so threads work well:
Python releases the GIL while a thread waits on the network, letting all the
branch fetches happen concurrently.
Each mock call sleeps a random amount (up to MAX_SLEEP) to stand in for a slow
API call. Live logging + an ASCII timeline at the end let you *see* the threads
overlap.
"""
import json
import time
import random
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
MAX_SLEEP = 5.0 # seconds; each mock call sleeps random.uniform(0.5, MAX_SLEEP)
# relativeCreated = ms since the program started -> a shared clock that makes
# concurrent START/DONE lines easy to line up by eye.
logging.basicConfig(
level=logging.INFO,
format="%(relativeCreated)7.0f ms | %(threadName)-10s | %(message)s",
)
log = logging.getLogger("branch_fetch")
# Timeline events: (thread_name, label, start, end). Appended from worker
# threads, so guard with a lock.
_events: list[tuple] = []
_events_lock = threading.Lock()
def _simulate_io(label: str, max_delay: float = MAX_SLEEP) -> None:
"""Stand in for one slow API call: log, sleep a random time, record it.
Pass max_delay to model how expensive a particular call really is.
"""
delay = random.uniform(min(0.5, max_delay), max_delay)
start = time.perf_counter()
log.info(f"START {label} (~{delay:.1f}s)")
time.sleep(delay)
end = time.perf_counter()
log.info(f"DONE {label} ({delay:.1f}s)")
with _events_lock:
_events.append((threading.current_thread().name, label, start, end))
def print_timeline(width: int = 60) -> None:
"""Render the recorded calls as an ASCII Gantt chart."""
if not _events:
return
t0 = min(e[2] for e in _events)
t1 = max(e[3] for e in _events)
span = t1 - t0
print(f"\nThread timeline (full width = {span:.1f}s):\n")
for thread, label, s, e in sorted(_events, key=lambda x: x[2]):
lead = int((s - t0) / span * width)
bar = max(1, int((e - s) / span * width))
line = " " * lead + "#" * bar
print(f" {thread:<10} |{line:<{width}}| {label} ({e - s:.1f}s)")
# --- Mock data --------------------------------------------------------------
# Inlined so there is no external JSON to load. Mirrors the shape the real API
# returns: each branch has two terminals, and each terminal resolves to a bus.
_BRANCHES = [
{
"branch_uri": "100",
"branch_name": "Sol - Luna",
"length": 42,
"terminal1_uri": "501",
"terminal2_uri": "502",
"bus1": {"bus_name": "SOL", "bus_num": "5201", "kv": 138},
"bus2": {"bus_name": "LUNA", "bus_num": "5202", "kv": 138},
},
{
"branch_uri": "101",
"branch_name": "Sol - Mars",
"length": 3.14,
"terminal1_uri": "501",
"terminal2_uri": "503",
"bus1": {"bus_name": "SOL", "bus_num": "5201", "kv": 138},
"bus2": {"bus_name": "MARS", "bus_num": "5203", "kv": 138},
},
{
"branch_uri": "103",
"branch_name": "Ceres - Mars",
"length": 4.20,
"terminal1_uri": "504",
"terminal2_uri": "503",
"bus1": {"bus_name": "CERES", "bus_num": "5204", "kv": 138},
"bus2": {"bus_name": "MARS", "bus_num": "5203", "kv": 138},
},
]
# Index by branch uri so each mock "API call" can look up just its slice.
_BRANCH_BY_URI = {b["branch_uri"]: b for b in _BRANCHES}
# Each terminal resolves to exactly one bus; build that lookup so
# get_bus_for_terminal() can resolve a bus without knowing the branch.
_BUS_BY_TERMINAL = {}
for _b in _BRANCHES:
_BUS_BY_TERMINAL[_b["terminal1_uri"]] = _b["bus1"]
_BUS_BY_TERMINAL[_b["terminal2_uri"]] = _b["bus2"]
# --- Mocked API calls -------------------------------------------------------
def get_branch_list(scenario_id: int) -> list[str]:
"""Return a list of URIs for branches touched in a scenario. One call, up front."""
_simulate_io(f"branch_list(scenario={scenario_id})", max_delay=1.0)
return [b["branch_uri"] for b in _BRANCHES]
def get_branch_props(branch_uri: str) -> dict:
"""Direct properties of a branch (no related tables): name, length, terminals."""
_simulate_io(f"props({branch_uri})", max_delay=2.0)
branch = _BRANCH_BY_URI[branch_uri]
return {
"branch_name": branch["branch_name"],
"length": branch["length"],
"terminal1_uri": branch["terminal1_uri"],
"terminal2_uri": branch["terminal2_uri"],
}
def get_bus_for_terminal(terminal_uri: str) -> dict:
"""Get the bus associated with a branch terminal."""
_simulate_io(f"bus@term({terminal_uri})", max_delay=3.5)
return _BUS_BY_TERMINAL[terminal_uri]
# --- Orchestration ----------------------------------------------------------
def fetch_all(scenario_id: int) -> list[dict]:
"""Get the branch list, then fetch every branch in parallel.
A single, flat thread pool runs in two waves (no nested pools):
wave 1 - get_branch_props for every branch at once
wave 2 - get_bus_for_terminal for both terminals of every branch at once
Wave 2 needs the terminal uris produced by wave 1, so the waves run in
order, but everything *within* a wave runs concurrently.
"""
branch_uris = get_branch_list(scenario_id)
# Pool must be wide enough for the larger wave: 2 bus calls per branch.
# For long branch lists, cap this (e.g. 16) to avoid hammering the API.
with ThreadPoolExecutor(
max_workers=2 * len(branch_uris), thread_name_prefix="worker"
) as executor:
# Wave 1: all branch props concurrently.
props_futures = {
executor.submit(get_branch_props, uri): uri
for uri in branch_uris
}
props_by_uri = {
props_futures[f]: f.result() for f in as_completed(props_futures)
}
# Wave 2: both bus lookups for every branch concurrently. Tag each
# future with (uri, slot) so we can reassemble the records afterwards.
bus_futures = {}
for uri, props in props_by_uri.items():
bus_futures[executor.submit(get_bus_for_terminal, props["terminal1_uri"])] = (uri, "bus1")
bus_futures[executor.submit(get_bus_for_terminal, props["terminal2_uri"])] = (uri, "bus2")
buses = {bus_futures[f]: f.result() for f in as_completed(bus_futures)}
# Combine the two waves back into one record per branch.
results = [
{
"branch_uri": uri,
"branch_name": props_by_uri[uri]["branch_name"],
"length": props_by_uri[uri]["length"],
"bus1": buses[(uri, "bus1")],
"bus2": buses[(uri, "bus2")],
}
for uri in branch_uris
]
results.sort(key=lambda r: r["branch_uri"])
return results
def fetch_all_sequential(scenario_id: int) -> list[dict]:
"""Same work as fetch_all, but every call runs one after another.
Useful as a baseline to compare against the parallel version.
"""
branch_uris = get_branch_list(scenario_id)
results = []
for uri in branch_uris:
props = get_branch_props(uri)
results.append({
"branch_uri": uri,
"branch_name": props["branch_name"],
"length": props["length"],
"bus1": get_bus_for_terminal(props["terminal1_uri"]),
"bus2": get_bus_for_terminal(props["terminal2_uri"]),
})
results.sort(key=lambda r: r["branch_uri"])
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-s", "--sequential", action="store_true",
help="Run every call one after another instead of in parallel",
)
args = parser.parse_args()
fetch = fetch_all_sequential if args.sequential else fetch_all
mode = "sequential" if args.sequential else "parallel"
start = time.perf_counter()
data = fetch(scenario_id=42)
elapsed = time.perf_counter() - start
print_timeline()
print("\n" + json.dumps(data, indent=4))
print(f"\nFetched {len(data)} branches in {elapsed:.1f}s ({mode})")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment