Created
June 18, 2022 00:45
-
-
Save alexrudy/fd9d2d65d7fd75e9718499e28ec5600a to your computer and use it in GitHub Desktop.
Parallelism Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Python parallelism demo | |
Run this file with `python parallel-demo.py` to see how different | |
parallelism strategies fare with different kinds of parallism. | |
This requires aiohttp (pip install aiohttp) and requests (pip install requests) | |
Two environment variables control behavior: | |
INTERACT=1 python t.py | |
will stop before running each test, and wait for you to press a key to continue. | |
DEBUG=1 python t.py | |
will turn on debug logging. | |
""" | |
import multiprocessing | |
import concurrent.futures | |
import time | |
import requests | |
import logging | |
import functools | |
import asyncio | |
import os | |
import random | |
import io | |
import threading | |
import aiohttp | |
import abc | |
import typing as t | |
DEBUG = bool(os.environ.get("DEBUG", "")) | |
INTERACT = bool(os.environ.get("INTERACT", "")) | |
if DEBUG: | |
logging.basicConfig(level=logging.DEBUG) | |
def fib(i: int) -> int: | |
"""Recursively compute fibbonaci numbers | |
This implementaion is designed to be slow to simulate CPU-bound work. | |
""" | |
if i == 1: | |
return 1 | |
if i == 0: | |
return 0 | |
return fib(i - 1) + fib(i - 2) | |
N = 31 | |
def busyloop() -> None: | |
""" | |
Spend about 1s of CPU time computing the 32nd fibonnaicci number. | |
""" | |
# This takes about 1s of CPU time | |
# and holds the GIL during most of that time. | |
start = time.monotonic() | |
x = fib(32) * fib(32) | |
while time.monotonic() - start < 1.0: | |
x += x+N | |
logging.debug(f"Finished in {time.monotonic() - start:.1f}") | |
def msg(msg: str, i: int) -> None: | |
"""Print a message with the worker number prefixed""" | |
print(f"[{i}] {msg}") | |
def do_some_work(i: int, busy: bool=False) -> int: | |
"""Do some work | |
Runs an HTTP request (IO bound) | |
and either a sleep (no GIL holding) | |
or a fibonnaicci calculation (holds GIL and uses CPU) | |
""" | |
url = f"http://httpbin.org/get?i={i}" | |
msg(f"Starting request to {url}", i) | |
requests.get(url) | |
msg("Finished request. Starting work", i) | |
if busy: | |
busyloop() | |
else: | |
time.sleep(1) | |
msg(f"Completed work", i) | |
return i | |
async def do_some_async_work(i: int, busy: bool=False) -> int: | |
"""Do some work – uses cooperative multitasking | |
Runs an HTTP request (IO bound) | |
and either a sleep (no GIL holding) | |
or a fibonnaicci calculation (holds GIL and uses CPU) | |
""" | |
url = f"http://httpbin.org/get?i={i}" | |
msg(f"Starting request to {url}", i) | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url) as response: | |
resp = await response.text() | |
msg("Finished request. Starting work", i) | |
if busy: | |
busyloop() | |
else: | |
await asyncio.sleep(1) | |
msg(f"Completed work", i) | |
return i | |
class Context(metaclass=abc.ABCMeta): | |
"""Provides for a simple, no-op context manager | |
This makes context managers interchangeable with classes | |
like multiprocessing.Pool so we can compare parallelism strategies. | |
""" | |
def __enter__(self) -> "Context": | |
return self | |
def __exit__(self, *args): | |
return None | |
@abc.abstractmethod | |
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]: | |
raise NotImplemented("Subclasses must implement .map") | |
class Serial(Context): | |
""" | |
Run a sequence of tasks serially with no parallelism | |
""" | |
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]: | |
return list(map(func, iterable)) | |
class AsyncIOPool(Context): | |
""" | |
Run a sequence of tasks using asyncio | |
""" | |
async def worker(self, func, item): | |
if asyncio.iscoroutinefunction(func): | |
return await func(item) | |
else: | |
return func(item) | |
async def driver(self, func, iterable): | |
tasks = [asyncio.create_task(self.worker(func, item)) for item in iterable] | |
return await asyncio.gather(*tasks) | |
def map(self, func: t.Callable[[int], int], iterable: t.Iterable[int]) -> t.List[int]: | |
return asyncio.run(self.driver(func, iterable)) | |
class AsyncIODanglingPool(AsyncIOPool): | |
""" | |
Run a sequence of tasks using asyncio, but don't await | |
the individual tasks, just spawn them and leave them | |
This is not a good way to use asyncio, but it demos | |
one of the major pitfalls – failing to await a task | |
is easy and just causes tasks to never run. | |
""" | |
async def worker(self, func, item): | |
""" | |
A worker function for calling a func which is either async or a regular function | |
""" | |
r = await super().worker(func, item) | |
self.completed.add(r) | |
return r | |
async def builder(self, func, item): | |
""" | |
A worker function for calling a func which is either async or a regular function | |
""" | |
asyncio.create_task(self.worker(func, item)) | |
async def driver(self, func, iterable): | |
tasks = [self.builder(func, item) for item in iterable] | |
await asyncio.wait(tasks) | |
return self.completed | |
def map(self, func, iterable): | |
self.completed = set() | |
return super().map(func, iterable) | |
def driver(pool_cls, n=5, busy=False, func=do_some_work): | |
"""Drive a processing pool to run n tasks""" | |
with pool_cls() as pool: | |
return list(pool.map(functools.partial(func, busy=busy), range(n))) | |
def trial(func, *args, name): | |
"""Run a single example, printing out information""" | |
print(f" {name} ".center(70, "-")) | |
if INTERACT: | |
input("Start?") | |
start = time.monotonic() | |
finished = func(*args) | |
end = time.monotonic() | |
print(f"Took {end-start:.1f}s to run 5 {name}") | |
msg = ", ".join((str(i) for i in finished)) | |
print(f"Completed {len(finished)} tasks: {msg}") | |
return (end - start, len(finished)) | |
def main(): | |
"""Run all the examples and show the summaries at the end""" | |
n_tasks = 5 | |
TIMINGS = {} | |
for (pool, pool_name) in [ | |
(Serial, "serial"), | |
(multiprocessing.Pool, "multiprocessing"), | |
(concurrent.futures.ThreadPoolExecutor, "concurrent.futures threads"), | |
(concurrent.futures.ProcessPoolExecutor, "concurrent.futures process"), | |
(AsyncIOPool, "asyncio"), | |
(AsyncIODanglingPool, "asyncio dangling"), | |
]: | |
for busy in (True, False): | |
_busy = " busy" if busy else "" | |
name = f"{pool_name}{_busy}" | |
TIMINGS[name] = trial(driver, pool, n_tasks, busy, name=name) | |
for busy in (True, False): | |
_busy = " busy" if busy else "" | |
name = f"asyncio with cooperation{_busy}" | |
TIMINGS[name] = trial(driver, AsyncIOPool, n_tasks, busy, do_some_async_work, name=name) | |
name = f"asyncio dangling with cooperation{_busy}" | |
TIMINGS[name] = trial(driver, AsyncIODanglingPool, n_tasks, busy, do_some_async_work, name=name) | |
print("") | |
print(" summary ".center(70, "=")) | |
width = max(len(name) for name in TIMINGS) | |
for name, (duration, n_finished) in TIMINGS.items(): | |
summary_line(name, duration, n_finished, width, n_tasks) | |
print(" sorted summary ".center(70, "=")) | |
for name in sorted(TIMINGS.keys(), key=lambda key: TIMINGS[key][0]): | |
(duration, n_finished) = TIMINGS[name] | |
summary_line(name, duration, n_finished, width, n_tasks) | |
def summary_line(name, duration, n_finished, width, n_expected): | |
"""Print a summary line""" | |
is_err = "!!!" if n_finished != n_expected else "" | |
print(f"{name:{width}.{width}s} | {duration: 5.1f}s | {n_finished: 2d}/5 finished {is_err}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment