Skip to content

Instantly share code, notes, and snippets.

@vgmoose
Created June 24, 2022 22:06
Show Gist options
  • Save vgmoose/3a77cb1baa9c9f2604f72d34c0024f55 to your computer and use it in GitHub Desktop.
Save vgmoose/3a77cb1baa9c9f2604f72d34c0024f55 to your computer and use it in GitHub Desktop.
Multiprocessing pool equivalent for python threading (use like: ThreadRunner(5).parallelize(func, [data]), similar to Pool(5).map(func, [data])
# for multiple cores stuffs
import threading
from collections import deque
NUM_CORES = 7
# NUM_CORES = 62
class ThreadRunner:
def __init__(self, num_cores=NUM_CORES):
self.num_cores = num_cores
self.input_data = deque([])
self.output_data = deque([])
def wrapper(self):
# this loops forever to keep pulling new images to process off the queue
# (we can't check if the queue has at least one element, because it might
# anymore not by the time the next line is hit)
try:
while True:
# consume an element form the input_data queue (throws if empty)
idx, element = self.input_data.popleft()
# process the element using the target_func setup by parallelize
result = self.target_func(element)
# add the result to the output_data queue
self.output_data.append((idx, result))
except IndexError:
# if the input_data queue is empty, then we're done
pass
def parallelize(self, func, data):
# give each elem an index, to maintain order when returning
self.input_data = deque(enumerate(data))
self.output_data = deque([])
self.target_func = func
all_threads = []
for _ in range(self.num_cores):
t = threading.Thread(target=self.wrapper)
all_threads.append(t)
t.start()
# wait synchronously for all threads to finish
for t in all_threads:
t.join()
# sort the output_data deque by index
sorted_data = sorted(self.output_data, key=lambda x: x[0])
# return the output data, without indices, like we're not even skippin a beat
return [x[1] for x in sorted_data]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment