Skip to content

Instantly share code, notes, and snippets.

@huangsam
Last active January 4, 2018 21:55
Show Gist options
  • Save huangsam/ccd2ed76d88369df2d8dc88c18d08e6d to your computer and use it in GitHub Desktop.
Save huangsam/ccd2ed76d88369df2d8dc88c18d08e6d to your computer and use it in GitHub Desktop.
Threading for "embarrassingly parallel" tasks
# -*- coding: utf-8 -*-
import multiprocessing as mp
import queue
import threading
def worker(f, iq, oq):
"""Logic for a single worker thread.
The loop terminates when it's given a None item, so ensure
that the input queue does NOT have empty inputs. The logic assumes
that None results need not be accounted for in the results.
Args:
f: Function to run.
iq: Input queue.
oq: Output queue.
"""
while True:
item = iq.get()
if item is None:
break
result = f(item)
iq.task_done()
if result:
oq.put(result)
def run_workers(work, data, num_worker_threads=mp.cpu_count()*4):
"""Run worker threads on data collection.
Args:
work: Workhorse function for worker.
data: An iterable collection of values to feed.
num_worker_threads: Number of threads to run.
Returns:
Aggregate results from executing f:work on data.
"""
iq = queue.Queue()
oq = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker, args=(work, iq, oq))
t.start()
threads.append(t)
for item in data:
iq.put(item)
# Block until all tasks are done
iq.join()
# Stop workers
for i in range(num_worker_threads):
iq.put(None)
for t in threads:
t.join()
# Gather results
results = []
while not oq.empty():
results.append(oq.get())
return results
# -*- coding: utf-8 -*-
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
import queue
import threading
def run_workers(work, data, num_worker_threads=cpu_count()*4):
"""Run worker threads on data collection.
Args:
work: Workhorse function for worker.
data: An iterable collection of values to feed.
num_worker_threads: Number of threads to run.
Returns:
Aggregate results from executing f:work on data.
"""
with ThreadPool(num_worker_threads) as pool:
return pool.map(work, data)
# -*- coding: utf-8 -*-
from concurrent import futures
import logging
import multiprocessing as mp
LOGGER = logging.getLogger(__name__)
def run_workers(work, data, num_worker_threads=mp.cpu_count()*4):
"""Run worker threads on data collection.
Args:
work: Workhorse function for worker.
data: An iterable collection of values to feed.
num_worker_threads: Number of threads to run.
Returns:
Aggregate results from executing f:work on data.
"""
results = []
with futures.ThreadPoolExecutor(max_workers=num_worker_threads) as executor:
future_to_result = {
executor.submit(work, arg): arg for arg in data}
for future in futures.as_completed(future_to_result):
try:
results.append(future.result())
except Exception as e:
LOGGER.warn('{}: {}', type(e).__name__, e)
return results
@huangsam
Copy link
Author

huangsam commented Jan 4, 2018

This is the same as utilizing Python's undocumented ThreadPool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment