Created
June 17, 2022 22:24
-
-
Save mberlanda/b114a4ee5df52af99a11bafe9e03d161 to your computer and use it in GitHub Desktop.
Sample demonstration of a wrapper around a mp.JoinableQueue enforcing message deduplication via a mp.Manager and a mp.Lock.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
import os | |
import time | |
from dataclasses import dataclass | |
from datetime import datetime | |
@dataclass | |
class JobItem: | |
x: int | |
y: int | |
# define hash function to store refs into a manager dict for de-duplication | |
def __hash__(self): | |
return hash(self.__str__()) | |
# https://stackoverflow.com/a/55563139 | |
def delegate(to, *methods): | |
def dec(klass): | |
def create_delegator(method): | |
def delegator(self, *args, **kwargs): | |
obj = getattr(self, to) | |
m = getattr(obj, method) | |
return m(*args, **kwargs) | |
return delegator | |
for m in methods: | |
setattr(klass, m, create_delegator(m)) | |
return klass | |
return dec | |
@delegate('queue', 'empty', 'get', 'join', 'task_done') | |
class JoinableQueueWrapper: | |
def __init__(self): | |
self.queue = mp.JoinableQueue() | |
self.cache = mp.Manager().dict() | |
self.lock = mp.Lock() | |
def push_item(self, item): | |
with self.lock: | |
if item in self.cache.keys(): | |
# print("[CACHE] Item already processed. {}".format(item)) | |
self.cache[item] += 1 | |
return | |
self.queue.put(item) | |
self.cache[item] = 1 | |
# store processed items to evaluate the output | |
processed_items = mp.Manager().list() | |
# max wait intervals before killing one process | |
WAIT_INTERVAL = 3 | |
def worker(queue): | |
worker_wait_interval = WAIT_INTERVAL | |
pid = os.getpid() | |
print("[{} - {}] - Worker started".format(pid, datetime.now())) | |
while True: | |
if queue.empty(): | |
if worker_wait_interval < 1: | |
break | |
print("[{} - {}] - Queue empty. Waiting {} seconds before stopping".format(pid, datetime.now(), worker_wait_interval)) | |
time.sleep(1) | |
worker_wait_interval -= 1 | |
continue | |
worker_wait_interval = WAIT_INTERVAL | |
item = queue.get() | |
queue.task_done() | |
processed_items.append(item.__str__()) | |
x = int(item.x /2) | |
y = int(item.y /3) | |
if x > 0 or y > 0: | |
queue.push_item(JobItem(x, y)) | |
queue.push_item(JobItem(x, y+1)) | |
queue.push_item(JobItem(x-1, y)) | |
time.sleep(0.1) | |
print("[{} - {}] - Processing item: {}".format(pid, datetime.now(), item)) | |
if __name__ == '__main__': | |
start_time = datetime.now() | |
my_queue = JoinableQueueWrapper() | |
max_workers = mp.cpu_count() * 2 | |
workers = [mp.Process(target=worker, args=(my_queue,)) for i in range(max_workers)] | |
my_queue.push_item(JobItem(12554530, 2018052)) | |
my_queue.push_item(JobItem(3210 * 54, 415 * 567)) | |
my_queue.push_item(JobItem(74335, 5791)) | |
my_queue.push_item(JobItem(1033, 19795)) | |
for p in workers: | |
p.daemon = True | |
p.start() | |
# wait for all workers to complete their jobs | |
for p in workers: | |
p.join() | |
# wait for the queue to be empty | |
my_queue.join() | |
# collect metrics to report data | |
total_processed_items = len(processed_items) | |
uniq_processed_items = len(set(processed_items)) | |
total_dedup_items = sum(my_queue.cache.values()) - uniq_processed_items | |
print("total time: {}, workers: {}, total_processed_items: {}, uniq_processed_items: {}, total_dedup_items: {}".format( | |
datetime.now() - start_time, | |
max_workers, | |
total_processed_items, | |
uniq_processed_items, | |
total_dedup_items | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment