-
-
Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
import multiprocessing | |
from multiprocessing.dummy import Pool as ThreadPool | |
import numpy as np | |
def my_multipro(items, func, max_cpus=12): | |
"""Do an embarrassingly parallel task using multiprocessing. | |
Use this for CPU bound tasks. | |
Parameters | |
---------- | |
items : list | |
Items to be acted on. | |
func : function | |
Function to apply to each item. | |
max_cpus : int | |
Limit the number of CPUs to use | |
""" | |
# Don't use more CPUs than you have or more than there are items to process | |
cpus = np.min([max_cpus, multiprocessing.cpu_count(), len(items)]) | |
print(f"Using {cpus} cpus to process {len(items)} chunks.") | |
print("".join(["🔳" for i in items])) | |
with multiprocessing.Pool(cpus) as p: | |
results = p.map(func, items) | |
p.close() | |
p.join() | |
print("Finished!") | |
return results | |
def my_multithread(items, func, max_threads=12): | |
"""Do an embarrassingly parallel task using multithreading. | |
Use this for IO bound tasks. | |
Parameters | |
---------- | |
items : list | |
Items to be acted on. | |
func : function | |
Function to apply to each item. | |
max_cpus : int | |
Limit the number of CPUs to use | |
""" | |
# Don't use more threads than there are items to process | |
threads = np.min([max_threads, len(items)]) | |
print(f"Using {threads} threads to process {len(items)} items.") | |
print("".join(["🔳" for i in items])) | |
with ThreadPool(threads) as p: | |
results = p.map(func, items) | |
p.close() | |
p.join() | |
print("Finished!") | |
return results | |
def do_this(i): | |
"""This function will be applied to every item given.""" | |
r = np.mean(i) | |
print("✅", end="") | |
return r | |
items = [ | |
[1, 2, 3, 4], | |
[2, 3, 4, 5], | |
[3, 4, 5, 6], | |
[4, 5, 6, 7], | |
[5, 6, 7, 8], | |
[6, 7, 8, 9], | |
] | |
a = my_multipro(items, do_this) | |
b = my_multithread(items, do_this) |
Multipro Helper
This function helps sends a list of jobs to either basic multiprocessing or multithreading, or sequential list comprehension.
import numpy as np
import inspect
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count # Multiprocessing
from multiprocessing.dummy import Pool as ThreadPool # Multithreading
def multipro_helper(func, inputs, cpus=6, threads=None, verbose=True):
"""
Multiprocessing and multithreading helper.
Parameters
----------
func : function
A function you want to apply to each item in ``inputs``.
If your function has many inputs, its useful to call a helper
function that unpacks the arguments for each input.
inputs : list
A list of input for the function being called.
cpus : int or None
Number of CPUs to use. Will not exceed maximum number available
and will not exceed the length of ``inputs``.
If None, will try to use multithreading.
threads : int or None
Number of threads to use. Will not exceed 50 and will not exceed
the length of ``inputs``.
If None, will try to do each task sequentially as a list
comprehension.
"""
assert callable(func), f"👻 {func} must be a callable function."
assert isinstance(inputs, list), f"👻 inputs must be a list."
timer = datetime.now()
if threads is not None:
assert isinstance(threads, np.integer), f"👻 threads must be a int. You gave {type(threads)}"
threads = np.minimum(threads, 50) # Don't allow more than 50 threads.
threads = np.minimum(threads, len(inputs))
if verbose: print(f'🧵 Multithreading {func} with [{threads:,}] threads for [{len(inputs):,}] items.', end=' ')
with ThreadPool(threads) as p:
results = p.map(func, inputs)
p.close()
p.join()
elif cpus is not None:
assert isinstance(cpus, np.integer), f"👻 cpus must be a int. You gave {type(cpus)}"
cpus = np.minimum(cpus, cpu_count())
cpus = np.minimum(cpus, len(inputs))
if verbose: print(f'🤹🏻♂️ Multiprocessing {func} with [{cpus:,}] CPUs for [{len(inputs):,}] items.', end=' ')
with Pool(cpus) as p:
results = p.map(func, inputs)
p.close()
p.join()
else:
if verbose: print(f'📏 Sequentially do {func} for [{len(inputs):,}] items.', end=' ')
results = [func(i) for i in inputs]
if verbose: print(f"Timer={datetime.now()-timer}")
return results
And this will make a plot of performance for a number of different pool sizes. It can help you see where you're getting diminishing returns for a larger number of Pools, and help you see if your problem is a CPU bound or IO bound process. Use multiprocessing for CPU-bound process and multithreading for IO-bound process.
def plot_multipro_effeciency(func, *args, pools=range(1,15), **kwargs):
"""
Display a figure showing the multiprocessing/multithreadding
efficiency for a range of Pool sizes.
func : function
A function that has keyword arguments for `cpus` and `threads`.
*args, *kwargs :
Arguments and keyword arguments for teh function.
pools : list of int
List of number of Pools to start for multiprocessing/multithreading.
"""
plt.rcParams['hatch.linewidth'] = 8
pools = np.sort(pools)
pools = set(pools)
if 0 in pools:
pools.remove(0)
assert 'cpus' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `cpus` argument."
assert 'threads' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `threads` argument."
_ = kwargs.pop('cpus', None)
_ = kwargs.pop('threads', None)
multipro = []
for i in pools:
timer = datetime.now()
_ = func(*args, **kwargs, cpus=i)
timer = datetime.now()-timer
multipro.append(timer)
multithread = []
for i in pools:
timer = datetime.now()
_ = func(*args, **kwargs, threads=i)
timer = datetime.now()-timer
multithread.append(timer)
timer = datetime.now()
_ = func(*args, **kwargs, cpus=None, threads=None)
timer = datetime.now()-timer
sequential = timer
# Plot completion time in seconds for each number in pool
# =======================================================
plt.bar(list(pools), [i.total_seconds() for i in multipro],
label='Multiprocessing', color='.1',)
plt.bar(list(pools), [i.total_seconds() for i in multithread],
label='Multithreading', hatch='/', edgecolor='tab:blue',
alpha=.33, color='tab:blue')
plt.bar(list(pools), [i.total_seconds() for i in multithread],
edgecolor='w', color='none')
plt.axhline(sequential.total_seconds(), ls='--', color='k',
label='Sequential')
# Cosmetics
plt.legend()
plt.ylabel('Seconds')
plt.xlabel('Number in Pool')
plt.title(f"{func.__module__}.{func.__name__}")
plt.xticks(list(pools))
return multipro, multithread, sequential
For example, in this example of reading a file and processing the data in it, this is a CPU-bound process and should use multiprocessing to spead it up, but you don't get much speed up if you use more than 5 or 6 CPUs.
Is there a way to get max number of threads in multithreading that a "system" can support like cpu_count
for multiprocessing.
from multiprocessing.dummy import Pool as ThreadPool
threads = 4
with ThreadPool(threads) as p:
results = p.map(print, list(range(10)))
p.close()
p.join()
So instead of 4 I can let the script determine it. I do not know how the underlying system works here so the question might be naive.
@urwa I would look at this for that answer: https://stackoverflow.com/questions/48970857/maximum-limit-on-number-of-threads-in-python
To use multiprocessing or multithreading depends on the task to be performed.
Multiprocessing
Use this for CPU bound tasks.
Multithreading
Use this for IO bound tasks. This is very similar to the Multiprocessing API. Read more about at these two sources:
Alternative method adapted from Mike Wessler
close()
prevents future jobs being submitted to the pool, join waits for all workers in pool to complete and exit before allowing the script to move forward, once it exitswith
, garbage collector cleans up processes.Chunksize
is the number of iterables that are submitted to each process,chunksize = 1
ensures the process is killed and a new one started before doing the next job, but be warned this increases i/o overhead