Skip to content

Instantly share code, notes, and snippets.

@rwarren
Created October 18, 2016 16:21
Show Gist options
  • Save rwarren/8bc7337646cb3bbb1ecb6c37a0b625fa to your computer and use it in GitHub Desktop.
Save rwarren/8bc7337646cb3bbb1ecb6c37a0b625fa to your computer and use it in GitHub Desktop.
Basic thread pool example whipped up quickly for #python discussion. Don't judge me.
import threading
import Queue
import time
QUEUE_TIMEOUT_s = 0.1
WORKER_COUNT = 200 # play with this! If switching to multiprocessing, use ncores+1 (or 2)
NUM_TASKS = 1000
SLOW_ADD_TIME_s = 0.1
done_event = threading.Event()
start_event = threading.Event() # not needed, but somewhat nice to sync them
task_queue = Queue.Queue()
answer_queue = Queue.Queue()
all_args = zip(range(NUM_TASKS), range(NUM_TASKS))
# the function you want to execute in parallel...
def parallel_slow_add(x, y):
time.sleep(SLOW_ADD_TIME_s)
return x + y
def worker_task(se, in_q, out_q):
start_event.wait() # all threads wait until we say go before chewing CPU
while True:
try:
task = task_queue.get(timeout=QUEUE_TIMEOUT_s)
except Queue.Empty:
if done_event.is_set():
break
else:
x, y = task
ans = parallel_slow_add(x, y)
answer_queue.put(ans)
# Make a pool of workers...
worker_args = (start_event, task_queue, answer_queue)
worker_pool = [threading.Thread(target=worker_task, args=worker_args) for i in range(WORKER_COUNT)]
# Start your workers (which will all wait for us)...
for worker in worker_pool:
worker.setDaemon(True)
worker.start()
# Fill the task queue (you would stuff your window sizes and data here)...
for args in all_args:
task_queue.put(args)
# Kick it off!
start_time_s = time.time()
start_event.set()
# Collect responses as they come in...
# - here we'll just add them all as your final rollup
answer_count = 0
total = 0
while answer_count < NUM_TASKS:
ans = answer_queue.get() # this blocks forever, you want to be smarter here
total += ans
answer_count += 1
if answer_count % 100 == 0:
print("Got %d answers so far!" % answer_count)
elapsed_time_s = time.time() - start_time_s
# Neatly close our workers...
done_event.set()
for worker in worker_pool:
worker.join()
print("Done! Moronic parallel slow add result was %d" % total)
cpu_time_s = NUM_TASKS * SLOW_ADD_TIME_s
print("Total cpu time for %d jobs was %.2fs. Clock time was %.2fs" % (NUM_TASKS,
cpu_time_s,
elapsed_time_s))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment