Last active
September 27, 2021 16:01
-
-
Save yiminglin-ai/167421711b9437308fe8a604872a52ea to your computer and use it in GitHub Desktop.
[parallelism] python scripts on threading and mutiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
tips from Jie: | |
1) Put worker_proc in a py file. The code may not work properly if you define it directly in your notebook. On the other hand, dispenser_proc can be defined in the notebook. | |
2) Use multiprocessing.Queue, not queue.Queue. | |
3) It's essential to have a job dispenser thread. Multiprocessing queue has a finite capacity. If you send jobs from your main thread, your code could hang. | |
If dispensing jobs in the main thread, it may hang. Reason: workers try to put stuff into the result queue, result queue becomes full, the main thread is not reading from result queue because it still tries to put stuff into the job queue, the workers hang such that they won’t read from the job queue, both workers are main thread hang | |
4) I use None to indicate 'end of queue'. It's important for the worker_proc to put None back into the job_queue once it gets a None, otherwise other workers may hang. | |
""" | |
import time | |
from multiprocessing import Queue, Process | |
from threading import Thread | |
import torch | |
from tqdm import tqdm | |
# Function to be ran in your worker process | |
config = {} | |
job_list = list(range(32)) | |
NUM_GPUS = 2 # dont use torch.cuda.device_count(), or it will cause RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method | |
PROC_PER_GPU = 2 | |
JOB_BATCH = NUM_GPUS * PROC_PER_GPU | |
def worker_proc(gpu_queue, job_queue, result_queue): | |
gpu = gpu_queue.get() | |
while True: | |
job = job_queue.get() | |
if job is None: | |
job_queue.put(None) | |
break | |
else: | |
# Do the job | |
result = f"job {job} running of gpu: {gpu}" | |
time.sleep(0.5) | |
conv = torch.nn.Conv2d(3, 64, 3).cuda(gpu) | |
result = conv(torch.randn(4, 3, 224, 224).cuda(gpu)).detach().cpu() | |
# DO NOT USE return, use result_queue instead | |
result_queue.put(result) | |
# A thread for dispensing job to different gpus | |
def dispenser_proc(job_list, gpu_queue, job_queue): | |
for job in job_list: | |
job_queue.put(job) | |
for gpu in range(NUM_GPUS): | |
for _ in range(PROC_PER_GPU): | |
gpu_queue.put(gpu) | |
job_queue.put(None) | |
if __name__ == "__main__": | |
# In your main thread, start workers and the the job dispenser | |
job_queue = Queue() | |
result_queue = Queue() | |
gpu_queue = Queue() | |
workers = [ | |
Process(target=worker_proc, args=(gpu_queue, job_queue, result_queue)) | |
for _ in range(NUM_GPUS * PROC_PER_GPU) | |
] | |
for worker in workers: | |
worker.start() | |
dispenser = Thread(target=dispenser_proc, args=(job_list, gpu_queue, job_queue)) | |
dispenser.start() | |
# Aggregate results | |
result_list = [] | |
for _ in tqdm(job_list, total=len(job_list)): | |
result_list.append(result_queue.get()) | |
# Make sure the workers and dispeners stop properly | |
for worker in workers: | |
worker.join() | |
dispenser.join() | |
print(len(result_list)) | |
for r in result_list: | |
print(r) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment