Created
June 24, 2022 22:06
-
-
Save vgmoose/3a77cb1baa9c9f2604f72d34c0024f55 to your computer and use it in GitHub Desktop.
Multiprocessing pool equivalent for python threading (use like: ThreadRunner(5).parallelize(func, [data]), similar to Pool(5).map(func, [data])
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for multiple cores stuffs | |
import threading | |
from collections import deque | |
NUM_CORES = 7 | |
# NUM_CORES = 62 | |
class ThreadRunner: | |
def __init__(self, num_cores=NUM_CORES): | |
self.num_cores = num_cores | |
self.input_data = deque([]) | |
self.output_data = deque([]) | |
def wrapper(self): | |
# this loops forever to keep pulling new images to process off the queue | |
# (we can't check if the queue has at least one element, because it might | |
# anymore not by the time the next line is hit) | |
try: | |
while True: | |
# consume an element form the input_data queue (throws if empty) | |
idx, element = self.input_data.popleft() | |
# process the element using the target_func setup by parallelize | |
result = self.target_func(element) | |
# add the result to the output_data queue | |
self.output_data.append((idx, result)) | |
except IndexError: | |
# if the input_data queue is empty, then we're done | |
pass | |
def parallelize(self, func, data): | |
# give each elem an index, to maintain order when returning | |
self.input_data = deque(enumerate(data)) | |
self.output_data = deque([]) | |
self.target_func = func | |
all_threads = [] | |
for _ in range(self.num_cores): | |
t = threading.Thread(target=self.wrapper) | |
all_threads.append(t) | |
t.start() | |
# wait synchronously for all threads to finish | |
for t in all_threads: | |
t.join() | |
# sort the output_data deque by index | |
sorted_data = sorted(self.output_data, key=lambda x: x[0]) | |
# return the output data, without indices, like we're not even skippin a beat | |
return [x[1] for x in sorted_data] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment