Created
August 21, 2025 21:43
-
-
Save booty/f192ae5a17efb10b209d471c0c5a48e0 to your computer and use it in GitHub Desktop.
Queue simulation (heterogenuous job sizes)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import random | |
from dataclasses import dataclass | |
from typing import List, Tuple, Dict, Optional | |
import numpy as np | |
from collections import deque | |
import math | |
@dataclass | |
class SimResult: | |
worker_sums: List[int] | |
average_sum_per_worker: float | |
max_sum_single_worker: int | |
total_jobs: int | |
total_work: int | |
def generate_jobs( | |
num_items: int, | |
mean: float, | |
stddev: float, | |
low: int, | |
high: int, | |
whale_percentage: float, | |
seed: int, | |
) -> List[int]: | |
""" | |
Generate `num_items` integer job durations following a truncated normal, | |
clipped to [low, high], then rounded to ints. | |
""" | |
rng = np.random.default_rng(seed) | |
py_rng = random.Random(seed) | |
# Draw from normal, then clip to [low, high], then round to nearest int and coerce to Python int | |
arr = rng.normal(loc=mean, scale=stddev, size=num_items) | |
arr = np.clip(arr, low, high) | |
jobs = np.rint(arr).astype(np.int64).tolist() | |
# Guarantee all are at least 1 | |
clipped_jobs = [max(low, min(high, int(x))) for x in jobs] | |
num_whales = int(num_items * whale_percentage) | |
whale_indexes = py_rng.sample(range(0, num_items), num_whales) | |
for idx in whale_indexes: | |
clipped_jobs[idx] = high # Assign a whale job duration | |
# generate | |
return clipped_jobs | |
def simulate_round_robin( | |
jobs: List[int], | |
num_workers: int, | |
) -> SimResult: | |
""" | |
Simulate round-robin assignment with cooldown measured in GLOBAL iterations. | |
Mechanics: | |
- Pointer advances one worker per iteration: i = iter_idx % num_workers | |
- A worker can accept a job iff iter_idx >= next_available_iter[i] | |
- On assignment of a job with duration d, set next_available_iter[i] = iter_idx + d | |
- We stop when the job queue is empty (no need to burn down remaining cooldowns) | |
Time complexity is O(I) where I is the number of global iterations until | |
the queue is emptied. Memory is O(num_workers). | |
""" | |
if num_workers <= 0: | |
raise ValueError("num_workers must be >= 1") | |
q: deque[int] = deque(jobs) | |
worker_sums: List[int] = [0] * num_workers | |
# Next global iteration at which each worker is allowed to accept a job | |
next_avail: List[int] = [0] * num_workers | |
iter_idx: int = 0 | |
# Iterate until all jobs are assigned | |
while q: | |
w: int = iter_idx % num_workers | |
if iter_idx >= next_avail[w]: | |
# Assign if available | |
d: int = q.popleft() | |
worker_sums[w] += d | |
next_avail[w] = iter_idx + d | |
# Advance to the next global iteration regardless of assignment | |
iter_idx += 1 | |
avg_sum: float = float(sum(worker_sums)) / float(num_workers) if num_workers else math.nan | |
max_sum: int = max(worker_sums) if worker_sums else 0 | |
return SimResult( | |
worker_sums=worker_sums, | |
average_sum_per_worker=avg_sum, | |
max_sum_single_worker=max_sum, | |
total_jobs=len(jobs), | |
total_work=sum(jobs), | |
) | |
def print_results(result: SimResult, whale_percentage: float) -> None: | |
print(f"Total jobs: {result.total_jobs} (whale percentage: {whale_percentage*100:.2f}%)") | |
print(f"Total work: {result.total_work:,} minutes") | |
print(f"Workers: {len(result.worker_sums)}") | |
print(f"Ideal time per worker (ideal wall time): {result.average_sum_per_worker:,.2f} minutes") | |
print(f"Slowest worker (actual wall time): {result.max_sum_single_worker:,} minutes") | |
print(f"Idealness (ideal vs actual): {(result.average_sum_per_worker / result.max_sum_single_worker) * 100:,.2f}%") | |
print(f"Worker times:", result.worker_sums) | |
if __name__ == "__main__": | |
# Tweak these or make them CLI args if you prefer | |
NUM_WORKERS = 10 | |
WHALE_PERCENTAGE = 0.1 | |
for num_queued_items in (10, 50, 100, 1000, 5000): | |
jobs: List[int] = generate_jobs( | |
num_items=num_queued_items, | |
mean=10.0, | |
stddev=5.0, # reasonable spread; adjust as needed | |
low=1, | |
high=100, | |
seed=1337, | |
whale_percentage=WHALE_PERCENTAGE, | |
) | |
for sort in (False, True): | |
print(f"\n\n-------- {NUM_WORKERS} workers, {num_queued_items} jobs, sorted: {sort} --------") | |
if sort: | |
result = simulate_round_robin(sorted(jobs), NUM_WORKERS) | |
else: | |
result = simulate_round_robin(jobs, NUM_WORKERS) | |
print_results(result, WHALE_PERCENTAGE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment