Created
October 18, 2016 16:21
-
-
Save rwarren/8bc7337646cb3bbb1ecb6c37a0b625fa to your computer and use it in GitHub Desktop.
Basic thread pool example whipped up quickly for #python discussion. Don't judge me.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import Queue | |
import time | |
QUEUE_TIMEOUT_s = 0.1 | |
WORKER_COUNT = 200 # play with this! If switching to multiprocessing, use ncores+1 (or 2) | |
NUM_TASKS = 1000 | |
SLOW_ADD_TIME_s = 0.1 | |
done_event = threading.Event() | |
start_event = threading.Event() # not needed, but somewhat nice to sync them | |
task_queue = Queue.Queue() | |
answer_queue = Queue.Queue() | |
all_args = zip(range(NUM_TASKS), range(NUM_TASKS)) | |
# the function you want to execute in parallel... | |
def parallel_slow_add(x, y): | |
time.sleep(SLOW_ADD_TIME_s) | |
return x + y | |
def worker_task(se, in_q, out_q): | |
start_event.wait() # all threads wait until we say go before chewing CPU | |
while True: | |
try: | |
task = task_queue.get(timeout=QUEUE_TIMEOUT_s) | |
except Queue.Empty: | |
if done_event.is_set(): | |
break | |
else: | |
x, y = task | |
ans = parallel_slow_add(x, y) | |
answer_queue.put(ans) | |
# Make a pool of workers... | |
worker_args = (start_event, task_queue, answer_queue) | |
worker_pool = [threading.Thread(target=worker_task, args=worker_args) for i in range(WORKER_COUNT)] | |
# Start your workers (which will all wait for us)... | |
for worker in worker_pool: | |
worker.setDaemon(True) | |
worker.start() | |
# Fill the task queue (you would stuff your window sizes and data here)... | |
for args in all_args: | |
task_queue.put(args) | |
# Kick it off! | |
start_time_s = time.time() | |
start_event.set() | |
# Collect responses as they come in... | |
# - here we'll just add them all as your final rollup | |
answer_count = 0 | |
total = 0 | |
while answer_count < NUM_TASKS: | |
ans = answer_queue.get() # this blocks forever, you want to be smarter here | |
total += ans | |
answer_count += 1 | |
if answer_count % 100 == 0: | |
print("Got %d answers so far!" % answer_count) | |
elapsed_time_s = time.time() - start_time_s | |
# Neatly close our workers... | |
done_event.set() | |
for worker in worker_pool: | |
worker.join() | |
print("Done! Moronic parallel slow add result was %d" % total) | |
cpu_time_s = NUM_TASKS * SLOW_ADD_TIME_s | |
print("Total cpu time for %d jobs was %.2fs. Clock time was %.2fs" % (NUM_TASKS, | |
cpu_time_s, | |
elapsed_time_s)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment