Created
January 10, 2023 16:30
-
-
Save cupdike/03d723cec6c5384b49e5c43785eb3131 to your computer and use it in GitHub Desktop.
Multiprocessing Pool Using Process Subclass with Custom Attributes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
from multiprocessing.pool import Pool | |
# GOAL IN CONTEXT: | |
# Simulate using a multiprocessing pool to download a list of files synchronously | |
# from a set of servers where each worker in the pool targets a specific | |
# download server. | |
# Our Worker subclasses Process so the target server can be added as an attribute. | |
# A CustPool subclasses Pool so our Worker subclass is used instead of Process. | |
# A worker (Process subclass) with a custom attribute (the server url it targets). | |
class Worker(mp.Process): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
# Assign any attributes to the worker... | |
self.server_url = server_urls.pop() | |
print(f"Custom worker constructed for server {self.server_url}.") | |
# A custom pool that uses our Worker sublcass. | |
class CustPool(Pool): # NOT mp.Pool which is a method (not a class) | |
def __init__(self, processes=None, initializer=None, initargs=(), | |
maxtasksperchild=None, context=None): | |
super().__init__(processes, initializer, initargs, | |
maxtasksperchild, context) | |
# Override the Process function to return our custom Worker | |
@staticmethod | |
def Process(ctx, *args, **kwds): | |
return Worker(*args, **kwds) | |
def download_task(file_name): | |
p = mp.current_process() | |
print(f"{p.name} for {p.server_url} is downloading file {file_name}", flush=True) | |
# Simimulate a bit of work so the pool spins up all the workers. | |
# Otherwise, all the tasks gets done by on worker | |
import time; time.sleep(.1) | |
if __name__ == '__main__': | |
# Simulate downloading some files from a set of servers where | |
# each server has a dedicated worker. | |
nServers = 3 | |
nFiles = 7 | |
files_to_download = [f"File{f}" for f in range(1, 1 + nFiles)] | |
server_urls = [f"https://myfileserver{i}" for i in range(1, 1 + nServers)] | |
with CustPool(processes=nServers) as pool: | |
pool.map(download_task, files_to_download) | |
""" STDOUT: | |
Custom worker constructed for server https://myfileserver3. | |
Custom worker constructed for server https://myfileserver2. | |
Custom worker constructed for server https://myfileserver1. | |
Worker-1 for https://myfileserver3 is downloading file File1 | |
Worker-2 for https://myfileserver2 is downloading file File2 | |
Worker-3 for https://myfileserver1 is downloading file File3 | |
Worker-1 for https://myfileserver3 is downloading file File4 | |
Worker-2 for https://myfileserver2 is downloading file File5 | |
Worker-3 for https://myfileserver1 is downloading file File6 | |
Worker-1 for https://myfileserver3 is downloading file File7 | |
""" |
Thanks for the great work!
Just curious about one thing. In Worker
class, if we implement(override) run
method, the program will run into a infinite __init__()
and run()
loop.
Update: Yeah, it should be fine, I forgot to call the super().run()
. Below code works well.
Works fine for me if I override Worker.run like this:
def run(self):
print("Hello from subclassed run()")
return super(Worker, self).run()
Wild guess, but make sure you are launching CustomPool under a if __name__ == '__main__':
block.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Inspired by https://gist.github.com/DomHudson/15a91d0ef38e2a88244c8dcc4521b0a8