Last active
January 4, 2018 21:55
-
-
Save huangsam/ccd2ed76d88369df2d8dc88c18d08e6d to your computer and use it in GitHub Desktop.
Threading for "embarrassingly parallel" tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import multiprocessing as mp | |
import queue | |
import threading | |
def worker(f, iq, oq): | |
"""Logic for a single worker thread. | |
The loop terminates when it's given a None item, so ensure | |
that the input queue does NOT have empty inputs. The logic assumes | |
that None results need not be accounted for in the results. | |
Args: | |
f: Function to run. | |
iq: Input queue. | |
oq: Output queue. | |
""" | |
while True: | |
item = iq.get() | |
if item is None: | |
break | |
result = f(item) | |
iq.task_done() | |
if result: | |
oq.put(result) | |
def run_workers(work, data, num_worker_threads=mp.cpu_count()*4): | |
"""Run worker threads on data collection. | |
Args: | |
work: Workhorse function for worker. | |
data: An iterable collection of values to feed. | |
num_worker_threads: Number of threads to run. | |
Returns: | |
Aggregate results from executing f:work on data. | |
""" | |
iq = queue.Queue() | |
oq = queue.Queue() | |
threads = [] | |
for i in range(num_worker_threads): | |
t = threading.Thread(target=worker, args=(work, iq, oq)) | |
t.start() | |
threads.append(t) | |
for item in data: | |
iq.put(item) | |
# Block until all tasks are done | |
iq.join() | |
# Stop workers | |
for i in range(num_worker_threads): | |
iq.put(None) | |
for t in threads: | |
t.join() | |
# Gather results | |
results = [] | |
while not oq.empty(): | |
results.append(oq.get()) | |
return results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from multiprocessing import cpu_count | |
from multiprocessing.pool import ThreadPool | |
import queue | |
import threading | |
def run_workers(work, data, num_worker_threads=cpu_count()*4): | |
"""Run worker threads on data collection. | |
Args: | |
work: Workhorse function for worker. | |
data: An iterable collection of values to feed. | |
num_worker_threads: Number of threads to run. | |
Returns: | |
Aggregate results from executing f:work on data. | |
""" | |
with ThreadPool(num_worker_threads) as pool: | |
return pool.map(work, data) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from concurrent import futures | |
import logging | |
import multiprocessing as mp | |
LOGGER = logging.getLogger(__name__) | |
def run_workers(work, data, num_worker_threads=mp.cpu_count()*4): | |
"""Run worker threads on data collection. | |
Args: | |
work: Workhorse function for worker. | |
data: An iterable collection of values to feed. | |
num_worker_threads: Number of threads to run. | |
Returns: | |
Aggregate results from executing f:work on data. | |
""" | |
results = [] | |
with futures.ThreadPoolExecutor(max_workers=num_worker_threads) as executor: | |
future_to_result = { | |
executor.submit(work, arg): arg for arg in data} | |
for future in futures.as_completed(future_to_result): | |
try: | |
results.append(future.result()) | |
except Exception as e: | |
LOGGER.warn('{}: {}', type(e).__name__, e) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is the same as utilizing Python's undocumented ThreadPool.