Created
June 16, 2026 03:11
-
-
Save cwebber314/a827ee970a537572d9e4ff45fd2dc008 to your computer and use it in GitHub Desktop.
Playing around with a threadpool to make I/O bound slow API calls run in parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Mock example: parallelizing slow API calls with ThreadPoolExecutor. | |
| The real bottleneck is a slow API library (network I/O), so threads work well: | |
| Python releases the GIL while a thread waits on the network, letting all the | |
| branch fetches happen concurrently. | |
| Each mock call sleeps a random amount (up to MAX_SLEEP) to stand in for a slow | |
| API call. Live logging + an ASCII timeline at the end let you *see* the threads | |
| overlap. | |
| """ | |
| import json | |
| import time | |
| import random | |
| import logging | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| MAX_SLEEP = 5.0 # seconds; each mock call sleeps random.uniform(0.5, MAX_SLEEP) | |
| # relativeCreated = ms since the program started -> a shared clock that makes | |
| # concurrent START/DONE lines easy to line up by eye. | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(relativeCreated)7.0f ms | %(threadName)-10s | %(message)s", | |
| ) | |
| log = logging.getLogger("branch_fetch") | |
| # Timeline events: (thread_name, label, start, end). Appended from worker | |
| # threads, so guard with a lock. | |
| _events: list[tuple] = [] | |
| _events_lock = threading.Lock() | |
| def _simulate_io(label: str, max_delay: float = MAX_SLEEP) -> None: | |
| """Stand in for one slow API call: log, sleep a random time, record it. | |
| Pass max_delay to model how expensive a particular call really is. | |
| """ | |
| delay = random.uniform(min(0.5, max_delay), max_delay) | |
| start = time.perf_counter() | |
| log.info(f"START {label} (~{delay:.1f}s)") | |
| time.sleep(delay) | |
| end = time.perf_counter() | |
| log.info(f"DONE {label} ({delay:.1f}s)") | |
| with _events_lock: | |
| _events.append((threading.current_thread().name, label, start, end)) | |
| def print_timeline(width: int = 60) -> None: | |
| """Render the recorded calls as an ASCII Gantt chart.""" | |
| if not _events: | |
| return | |
| t0 = min(e[2] for e in _events) | |
| t1 = max(e[3] for e in _events) | |
| span = t1 - t0 | |
| print(f"\nThread timeline (full width = {span:.1f}s):\n") | |
| for thread, label, s, e in sorted(_events, key=lambda x: x[2]): | |
| lead = int((s - t0) / span * width) | |
| bar = max(1, int((e - s) / span * width)) | |
| line = " " * lead + "#" * bar | |
| print(f" {thread:<10} |{line:<{width}}| {label} ({e - s:.1f}s)") | |
| # --- Mock data -------------------------------------------------------------- | |
| # Inlined so there is no external JSON to load. Mirrors the shape the real API | |
| # returns: each branch has two terminals, and each terminal resolves to a bus. | |
| _BRANCHES = [ | |
| { | |
| "branch_uri": "100", | |
| "branch_name": "Sol - Luna", | |
| "length": 42, | |
| "terminal1_uri": "501", | |
| "terminal2_uri": "502", | |
| "bus1": {"bus_name": "SOL", "bus_num": "5201", "kv": 138}, | |
| "bus2": {"bus_name": "LUNA", "bus_num": "5202", "kv": 138}, | |
| }, | |
| { | |
| "branch_uri": "101", | |
| "branch_name": "Sol - Mars", | |
| "length": 3.14, | |
| "terminal1_uri": "501", | |
| "terminal2_uri": "503", | |
| "bus1": {"bus_name": "SOL", "bus_num": "5201", "kv": 138}, | |
| "bus2": {"bus_name": "MARS", "bus_num": "5203", "kv": 138}, | |
| }, | |
| { | |
| "branch_uri": "103", | |
| "branch_name": "Ceres - Mars", | |
| "length": 4.20, | |
| "terminal1_uri": "504", | |
| "terminal2_uri": "503", | |
| "bus1": {"bus_name": "CERES", "bus_num": "5204", "kv": 138}, | |
| "bus2": {"bus_name": "MARS", "bus_num": "5203", "kv": 138}, | |
| }, | |
| ] | |
| # Index by branch uri so each mock "API call" can look up just its slice. | |
| _BRANCH_BY_URI = {b["branch_uri"]: b for b in _BRANCHES} | |
| # Each terminal resolves to exactly one bus; build that lookup so | |
| # get_bus_for_terminal() can resolve a bus without knowing the branch. | |
| _BUS_BY_TERMINAL = {} | |
| for _b in _BRANCHES: | |
| _BUS_BY_TERMINAL[_b["terminal1_uri"]] = _b["bus1"] | |
| _BUS_BY_TERMINAL[_b["terminal2_uri"]] = _b["bus2"] | |
| # --- Mocked API calls ------------------------------------------------------- | |
| def get_branch_list(scenario_id: int) -> list[str]: | |
| """Return a list of URIs for branches touched in a scenario. One call, up front.""" | |
| _simulate_io(f"branch_list(scenario={scenario_id})", max_delay=1.0) | |
| return [b["branch_uri"] for b in _BRANCHES] | |
| def get_branch_props(branch_uri: str) -> dict: | |
| """Direct properties of a branch (no related tables): name, length, terminals.""" | |
| _simulate_io(f"props({branch_uri})", max_delay=2.0) | |
| branch = _BRANCH_BY_URI[branch_uri] | |
| return { | |
| "branch_name": branch["branch_name"], | |
| "length": branch["length"], | |
| "terminal1_uri": branch["terminal1_uri"], | |
| "terminal2_uri": branch["terminal2_uri"], | |
| } | |
| def get_bus_for_terminal(terminal_uri: str) -> dict: | |
| """Get the bus associated with a branch terminal.""" | |
| _simulate_io(f"bus@term({terminal_uri})", max_delay=3.5) | |
| return _BUS_BY_TERMINAL[terminal_uri] | |
| # --- Orchestration ---------------------------------------------------------- | |
| def fetch_all(scenario_id: int) -> list[dict]: | |
| """Get the branch list, then fetch every branch in parallel. | |
| A single, flat thread pool runs in two waves (no nested pools): | |
| wave 1 - get_branch_props for every branch at once | |
| wave 2 - get_bus_for_terminal for both terminals of every branch at once | |
| Wave 2 needs the terminal uris produced by wave 1, so the waves run in | |
| order, but everything *within* a wave runs concurrently. | |
| """ | |
| branch_uris = get_branch_list(scenario_id) | |
| # Pool must be wide enough for the larger wave: 2 bus calls per branch. | |
| # For long branch lists, cap this (e.g. 16) to avoid hammering the API. | |
| with ThreadPoolExecutor( | |
| max_workers=2 * len(branch_uris), thread_name_prefix="worker" | |
| ) as executor: | |
| # Wave 1: all branch props concurrently. | |
| props_futures = { | |
| executor.submit(get_branch_props, uri): uri | |
| for uri in branch_uris | |
| } | |
| props_by_uri = { | |
| props_futures[f]: f.result() for f in as_completed(props_futures) | |
| } | |
| # Wave 2: both bus lookups for every branch concurrently. Tag each | |
| # future with (uri, slot) so we can reassemble the records afterwards. | |
| bus_futures = {} | |
| for uri, props in props_by_uri.items(): | |
| bus_futures[executor.submit(get_bus_for_terminal, props["terminal1_uri"])] = (uri, "bus1") | |
| bus_futures[executor.submit(get_bus_for_terminal, props["terminal2_uri"])] = (uri, "bus2") | |
| buses = {bus_futures[f]: f.result() for f in as_completed(bus_futures)} | |
| # Combine the two waves back into one record per branch. | |
| results = [ | |
| { | |
| "branch_uri": uri, | |
| "branch_name": props_by_uri[uri]["branch_name"], | |
| "length": props_by_uri[uri]["length"], | |
| "bus1": buses[(uri, "bus1")], | |
| "bus2": buses[(uri, "bus2")], | |
| } | |
| for uri in branch_uris | |
| ] | |
| results.sort(key=lambda r: r["branch_uri"]) | |
| return results | |
| def fetch_all_sequential(scenario_id: int) -> list[dict]: | |
| """Same work as fetch_all, but every call runs one after another. | |
| Useful as a baseline to compare against the parallel version. | |
| """ | |
| branch_uris = get_branch_list(scenario_id) | |
| results = [] | |
| for uri in branch_uris: | |
| props = get_branch_props(uri) | |
| results.append({ | |
| "branch_uri": uri, | |
| "branch_name": props["branch_name"], | |
| "length": props["length"], | |
| "bus1": get_bus_for_terminal(props["terminal1_uri"]), | |
| "bus2": get_bus_for_terminal(props["terminal2_uri"]), | |
| }) | |
| results.sort(key=lambda r: r["branch_uri"]) | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument( | |
| "-s", "--sequential", action="store_true", | |
| help="Run every call one after another instead of in parallel", | |
| ) | |
| args = parser.parse_args() | |
| fetch = fetch_all_sequential if args.sequential else fetch_all | |
| mode = "sequential" if args.sequential else "parallel" | |
| start = time.perf_counter() | |
| data = fetch(scenario_id=42) | |
| elapsed = time.perf_counter() - start | |
| print_timeline() | |
| print("\n" + json.dumps(data, indent=4)) | |
| print(f"\nFetched {len(data)} branches in {elapsed:.1f}s ({mode})") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment