Last active
October 24, 2023 04:20
-
-
Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
A custom MyQueue class that avoids `NotImplementedError` when calling queue.qsize() method.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing.queues import Queue | |
import multiprocessing | |
# The following implementation of custom MyQueue to avoid NotImplementedError | |
# when calling queue.qsize() in MacOS X comes almost entirely from this github | |
# discussion: https://github.com/keras-team/autokeras/issues/368 | |
# Necessary modification is made to make the code compatible with Python3. | |
class SharedCounter(object): | |
""" A synchronized shared counter. | |
The locking done by multiprocessing.Value ensures that only a single | |
process or thread may read or write the in-memory ctypes object. However, | |
in order to do n += 1, Python performs a read followed by a write, so a | |
second process may read the old value before the new one is written by the | |
first process. The solution is to use a multiprocessing.Lock to guarantee | |
the atomicity of the modifications to Value. | |
This class comes almost entirely from Eli Bendersky's blog: | |
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ | |
""" | |
def __init__(self, n=0): | |
self.count = multiprocessing.Value('i', n) | |
def increment(self, n=1): | |
""" Increment the counter by n (default = 1) """ | |
with self.count.get_lock(): | |
self.count.value += n | |
@property | |
def value(self): | |
""" Return the value of the counter """ | |
return self.count.value | |
class MyQueue(Queue): | |
""" A portable implementation of multiprocessing.Queue. | |
Because of multithreading / multiprocessing semantics, Queue.qsize() may | |
raise the NotImplementedError exception on Unix platforms like Mac OS X | |
where sem_getvalue() is not implemented. This subclass addresses this | |
problem by using a synchronized shared counter (initialized to zero) and | |
increasing / decreasing its value every time the put() and get() methods | |
are called, respectively. This not only prevents NotImplementedError from | |
being raised, but also allows us to implement a reliable version of both | |
qsize() and empty(). | |
Note the implementation of __getstate__ and __setstate__ which help to | |
serialize MyQueue when it is passed between processes. If these functions | |
are not defined, MyQueue cannot be serialized, which will lead to the error | |
of "AttributeError: 'MyQueue' object has no attribute 'size'". | |
See the answer provided here: https://stackoverflow.com/a/65513291/9723036 | |
For documentation of using __getstate__ and __setstate__ to serialize objects, | |
refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances | |
""" | |
def __init__(self): | |
super().__init__(ctx=multiprocessing.get_context()) | |
self.size = SharedCounter(0) | |
def __getstate__(self): | |
"""Help to make MyQueue instance serializable. | |
Note that we record the parent class state, which is the state of the | |
actual queue, and the size of the queue, which is the state of MyQueue. | |
self.size is a SharedCounter instance. It is itself serializable. | |
""" | |
return { | |
'parent_state': super().__getstate__(), | |
'size': self.size, | |
} | |
def __setstate__(self, state): | |
super().__setstate__(state['parent_state']) | |
self.size = state['size'] | |
def put(self, *args, **kwargs): | |
super().put(*args, **kwargs) | |
self.size.increment(1) | |
def get(self, *args, **kwargs): | |
item = super().get(*args, **kwargs) | |
self.size.increment(-1) | |
return item | |
def qsize(self): | |
""" Reliable implementation of multiprocessing.Queue.qsize() """ | |
return self.size.value | |
def empty(self): | |
""" Reliable implementation of multiprocessing.Queue.empty() """ | |
return not self.qsize() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from my_queue import MyQueue | |
from multiprocessing import Process | |
from time import sleep | |
from random import randint | |
# A simple use case of the custom MyQueue that allows .qsize() method | |
# in MacOS X. | |
def foo(q): | |
i = 0 | |
while True: | |
q.put(f'current i = {i}') | |
sleep(randint(0, 3)) | |
i += 1 | |
if __name__ == '__main__': | |
q: MyQueue = MyQueue() | |
p: Process = Process(target=foo, args=(q,)) | |
p.start() | |
times = 0 | |
while times < 5: | |
print(f'current qsize = {q.qsize()}') | |
if not q.empty(): | |
print(f'qsize = {q.qsize()} before get') | |
print(f'Item got from queue: {q.get()}') | |
print(f'qsize = {q.qsize()} after get') | |
times += 1 | |
sleep(randint(0, 3)) | |
p.terminate() | |
p.join() | |
print(f'qsize = {q.qsize()} at the end') |
@MikeMight You are very welcome.
Regarding your question, putting multiprocessing.Value
in the custom Queue
class does work. However, by doing so, the custom queue would have too much coupling with the shared counter. In other words, we are mixing two relatively independent concepts (queue and shared counter) together. This has three downsides.
- It complicates the logic for each concept (e.g. we will have to worry about whether the shared counter logic would break if we alter the features in the queue), which in turn makes testing and future code refactoring a giant pain.
- It discourages code re-use (the shared counter concept could be re-used elsewhere).
- It makes documentation more difficult.
To avoid the downsides mentioned above, I would prefer leaving the queue and the shared counter as two separate classes.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@FanchenBao Thank you for your reply and for the code you provided.
I want to ask about one more point, which is not to use a SharedCounter, but to put multiprocessing.Value in the attributes of the queue itself. I have already tried to do this and the results were the same, but I would like to know what were you guided by when you moved the logic of counting objects into a separate class? Is there any advantage to this?
It is possible that there will be mistakes in my answers, i apologize for them, i do not know engl well.