Last active
May 4, 2020 00:21
-
-
Save RHDZMOTA/edca3c24b7971a834de06be8aec54522 to your computer and use it in GitHub Desktop.
Multiprocessing when having a blocking process (matplotlib and disk writes) - a quick hack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "ASYNC-PROCESS": 2.6283581256866455, | |
| "ASYNC-THREAD": 11.003680944442749, | |
| "SYNC": 10.771448135375977 | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime as dt | |
| import json | |
| import time | |
| import os | |
| from multiprocessing.pool import ThreadPool, Pool | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| SQUARE_CONCURRENCY = int(os.environ.get( | |
| "SQUARE_CONCURRENCY", | |
| default="10" | |
| )) | |
| SQUARE_JOBS = int(os.environ.get( | |
| "SQUARE_JOBS", | |
| default="100" | |
| )) | |
| SQUARE_OUTPUT_FOLDER = { | |
| "sync": "out-sync", | |
| "async-process": "out-async-process", | |
| "async-thread": "out-async-thread" | |
| } | |
| def get_mutiprocessing_pools(): | |
| return { | |
| "POOL_THREAD": ThreadPool(SQUARE_CONCURRENCY), | |
| "POOL_PROCESS": Pool(SQUARE_CONCURRENCY) | |
| } | |
| def square(num: int, out_path: str): | |
| xs = [x for x in range(num)] | |
| fig = pd.DataFrame({ | |
| "x": xs, | |
| "y": [x * x for x in xs] | |
| }).plot.scatter(x="x", y="y").get_figure() | |
| fig_name = os.path.join(out_path, f"{num}.png") | |
| fig.savefig(fig_name) | |
| plt.close() | |
| return fig_name | |
| # This is only needed to avoid serialization problems with the process pool | |
| def square_process(num: int): | |
| return square(num=num, out_path=SQUARE_OUTPUT_FOLDER["async-process"]) | |
| def timeit(fun, *args, **kwargs): | |
| start = time.time() | |
| _ = fun(*args, **kwargs) | |
| return time.time() - start | |
| def test_async_thread(samples: int, pool: ThreadPool): | |
| output = SQUARE_OUTPUT_FOLDER["async-thread"] | |
| os.makedirs(output, exist_ok=True) | |
| return pool.map( | |
| lambda z: square(num=z, out_path=output), | |
| range(samples) | |
| ) | |
| def test_async_process(samples: int, pool: Pool): | |
| output = SQUARE_OUTPUT_FOLDER["async-process"] | |
| os.makedirs(output, exist_ok=True) | |
| return pool.map( | |
| square_process, | |
| range(samples) | |
| ) | |
| def test_sync(samples: int): | |
| output = SQUARE_OUTPUT_FOLDER["sync"] | |
| os.makedirs(output, exist_ok=True) | |
| return [square(num=s, out_path=output) for s in range(samples)] | |
| def main(): | |
| pools = get_mutiprocessing_pools() | |
| res = { | |
| "ASYNC-PROCESS": timeit( | |
| lambda z: test_async_process(samples=z, pool=pools["POOL_PROCESS"]), | |
| z=SQUARE_JOBS), | |
| "ASYNC-THREAD": timeit( | |
| lambda z: test_async_process(samples=z, pool=pools["POOL_THREAD"]), | |
| z=SQUARE_JOBS), | |
| "SYNC": timeit(lambda z: test_sync(samples=z), z=SQUARE_JOBS) | |
| } | |
| res_json = json.dumps(res, indent=4) | |
| filename = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S.json") | |
| with open(filename, "w") as file: | |
| file.write(res_json) | |
| return res_json | |
| if __name__ == "__main__": | |
| plt.switch_backend('Agg') | |
| print(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import os | |
| from multiprocessing.pool import ThreadPool, Pool | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| def square(num): | |
| #time.sleep(1) | |
| xs = [x for x in range(num)] | |
| fig = pd.DataFrame({ | |
| "x": xs, | |
| "y": [x * x for x in xs] | |
| }).plot.scatter(x="x", y="y").get_figure() | |
| fig_name = f"./out/{num}.png" | |
| fig.savefig(fig_name) | |
| plt.close() | |
| return fig_name | |
| def timeit(fun, *args, **kwargs): | |
| start = time.time() | |
| res = fun(*args, **kwargs) | |
| return time.time() - start, res | |
| def test_async(samples, thread_pool): | |
| return thread_pool.map(square, range(samples)) | |
| def test_sync(samples): | |
| return [square(s) for s in range(samples)] | |
| if __name__ == "__main__": | |
| plt.switch_backend('Agg') | |
| CONCURRENCY = 100 | |
| SAMPLES = 100 | |
| os.makedirs("out", exist_ok=True) | |
| pool = Pool(CONCURRENCY) | |
| duration, _ = timeit(lambda samples: test_async(samples, pool), samples=SAMPLES) | |
| pool.close() | |
| print(f"Took: {round(duration, 4)} seconds.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment