Skip to content

Instantly share code, notes, and snippets.

@RHDZMOTA
Last active May 4, 2020 00:21
Show Gist options
  • Select an option

  • Save RHDZMOTA/edca3c24b7971a834de06be8aec54522 to your computer and use it in GitHub Desktop.

Select an option

Save RHDZMOTA/edca3c24b7971a834de06be8aec54522 to your computer and use it in GitHub Desktop.
Multiprocessing when having a blocking process (matplotlib and disk writes) - a quick hack
{
"ASYNC-PROCESS": 2.6283581256866455,
"ASYNC-THREAD": 11.003680944442749,
"SYNC": 10.771448135375977
}
import datetime as dt
import json
import time
import os
from multiprocessing.pool import ThreadPool, Pool
import pandas as pd
import matplotlib.pyplot as plt
SQUARE_CONCURRENCY = int(os.environ.get(
"SQUARE_CONCURRENCY",
default="10"
))
SQUARE_JOBS = int(os.environ.get(
"SQUARE_JOBS",
default="100"
))
SQUARE_OUTPUT_FOLDER = {
"sync": "out-sync",
"async-process": "out-async-process",
"async-thread": "out-async-thread"
}
def get_mutiprocessing_pools():
return {
"POOL_THREAD": ThreadPool(SQUARE_CONCURRENCY),
"POOL_PROCESS": Pool(SQUARE_CONCURRENCY)
}
def square(num: int, out_path: str):
xs = [x for x in range(num)]
fig = pd.DataFrame({
"x": xs,
"y": [x * x for x in xs]
}).plot.scatter(x="x", y="y").get_figure()
fig_name = os.path.join(out_path, f"{num}.png")
fig.savefig(fig_name)
plt.close()
return fig_name
# This is only needed to avoid serialization problems with the process pool
def square_process(num: int):
return square(num=num, out_path=SQUARE_OUTPUT_FOLDER["async-process"])
def timeit(fun, *args, **kwargs):
start = time.time()
_ = fun(*args, **kwargs)
return time.time() - start
def test_async_thread(samples: int, pool: ThreadPool):
output = SQUARE_OUTPUT_FOLDER["async-thread"]
os.makedirs(output, exist_ok=True)
return pool.map(
lambda z: square(num=z, out_path=output),
range(samples)
)
def test_async_process(samples: int, pool: Pool):
output = SQUARE_OUTPUT_FOLDER["async-process"]
os.makedirs(output, exist_ok=True)
return pool.map(
square_process,
range(samples)
)
def test_sync(samples: int):
output = SQUARE_OUTPUT_FOLDER["sync"]
os.makedirs(output, exist_ok=True)
return [square(num=s, out_path=output) for s in range(samples)]
def main():
pools = get_mutiprocessing_pools()
res = {
"ASYNC-PROCESS": timeit(
lambda z: test_async_process(samples=z, pool=pools["POOL_PROCESS"]),
z=SQUARE_JOBS),
"ASYNC-THREAD": timeit(
lambda z: test_async_process(samples=z, pool=pools["POOL_THREAD"]),
z=SQUARE_JOBS),
"SYNC": timeit(lambda z: test_sync(samples=z), z=SQUARE_JOBS)
}
res_json = json.dumps(res, indent=4)
filename = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S.json")
with open(filename, "w") as file:
file.write(res_json)
return res_json
if __name__ == "__main__":
plt.switch_backend('Agg')
print(main())
import time
import os
from multiprocessing.pool import ThreadPool, Pool
import pandas as pd
import matplotlib.pyplot as plt
def square(num):
#time.sleep(1)
xs = [x for x in range(num)]
fig = pd.DataFrame({
"x": xs,
"y": [x * x for x in xs]
}).plot.scatter(x="x", y="y").get_figure()
fig_name = f"./out/{num}.png"
fig.savefig(fig_name)
plt.close()
return fig_name
def timeit(fun, *args, **kwargs):
start = time.time()
res = fun(*args, **kwargs)
return time.time() - start, res
def test_async(samples, thread_pool):
return thread_pool.map(square, range(samples))
def test_sync(samples):
return [square(s) for s in range(samples)]
if __name__ == "__main__":
plt.switch_backend('Agg')
CONCURRENCY = 100
SAMPLES = 100
os.makedirs("out", exist_ok=True)
pool = Pool(CONCURRENCY)
duration, _ = timeit(lambda samples: test_async(samples, pool), samples=SAMPLES)
pool.close()
print(f"Took: {round(duration, 4)} seconds.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment