-
-
Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
from multiprocessing.queues import Queue | |
import multiprocessing | |
# The following implementation of custom MyQueue to avoid NotImplementedError | |
# when calling queue.qsize() in MacOS X comes almost entirely from this github | |
# discussion: https://github.com/keras-team/autokeras/issues/368 | |
# Necessary modification is made to make the code compatible with Python3. | |
class SharedCounter(object): | |
""" A synchronized shared counter. | |
The locking done by multiprocessing.Value ensures that only a single | |
process or thread may read or write the in-memory ctypes object. However, | |
in order to do n += 1, Python performs a read followed by a write, so a | |
second process may read the old value before the new one is written by the | |
first process. The solution is to use a multiprocessing.Lock to guarantee | |
the atomicity of the modifications to Value. | |
This class comes almost entirely from Eli Bendersky's blog: | |
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ | |
""" | |
def __init__(self, n=0): | |
self.count = multiprocessing.Value('i', n) | |
def increment(self, n=1): | |
""" Increment the counter by n (default = 1) """ | |
with self.count.get_lock(): | |
self.count.value += n | |
@property | |
def value(self): | |
""" Return the value of the counter """ | |
return self.count.value | |
class MyQueue(Queue): | |
""" A portable implementation of multiprocessing.Queue. | |
Because of multithreading / multiprocessing semantics, Queue.qsize() may | |
raise the NotImplementedError exception on Unix platforms like Mac OS X | |
where sem_getvalue() is not implemented. This subclass addresses this | |
problem by using a synchronized shared counter (initialized to zero) and | |
increasing / decreasing its value every time the put() and get() methods | |
are called, respectively. This not only prevents NotImplementedError from | |
being raised, but also allows us to implement a reliable version of both | |
qsize() and empty(). | |
Note the implementation of __getstate__ and __setstate__ which help to | |
serialize MyQueue when it is passed between processes. If these functions | |
are not defined, MyQueue cannot be serialized, which will lead to the error | |
of "AttributeError: 'MyQueue' object has no attribute 'size'". | |
See the answer provided here: https://stackoverflow.com/a/65513291/9723036 | |
For documentation of using __getstate__ and __setstate__ to serialize objects, | |
refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances | |
""" | |
def __init__(self): | |
super().__init__(ctx=multiprocessing.get_context()) | |
self.size = SharedCounter(0) | |
def __getstate__(self): | |
"""Help to make MyQueue instance serializable. | |
Note that we record the parent class state, which is the state of the | |
actual queue, and the size of the queue, which is the state of MyQueue. | |
self.size is a SharedCounter instance. It is itself serializable. | |
""" | |
return { | |
'parent_state': super().__getstate__(), | |
'size': self.size, | |
} | |
def __setstate__(self, state): | |
super().__setstate__(state['parent_state']) | |
self.size = state['size'] | |
def put(self, *args, **kwargs): | |
super().put(*args, **kwargs) | |
self.size.increment(1) | |
def get(self, *args, **kwargs): | |
item = super().get(*args, **kwargs) | |
self.size.increment(-1) | |
return item | |
def qsize(self): | |
""" Reliable implementation of multiprocessing.Queue.qsize() """ | |
return self.size.value | |
def empty(self): | |
""" Reliable implementation of multiprocessing.Queue.empty() """ | |
return not self.qsize() |
from my_queue import MyQueue | |
from multiprocessing import Process | |
from time import sleep | |
from random import randint | |
# A simple use case of the custom MyQueue that allows .qsize() method | |
# in MacOS X. | |
def foo(q): | |
i = 0 | |
while True: | |
q.put(f'current i = {i}') | |
sleep(randint(0, 3)) | |
i += 1 | |
if __name__ == '__main__': | |
q: MyQueue = MyQueue() | |
p: Process = Process(target=foo, args=(q,)) | |
p.start() | |
times = 0 | |
while times < 5: | |
print(f'current qsize = {q.qsize()}') | |
if not q.empty(): | |
print(f'qsize = {q.qsize()} before get') | |
print(f'Item got from queue: {q.get()}') | |
print(f'qsize = {q.qsize()} after get') | |
times += 1 | |
sleep(randint(0, 3)) | |
p.terminate() | |
p.join() | |
print(f'qsize = {q.qsize()} at the end') |
This is exactly what I was looking for, thank you for sharing @FanchenBao!
There's this one detail and I'm sure whoever wants to use that code will quickly find the solution, but I'll mention it anyway: the multiprocessing.Queue
has an optional maxsize
argument. So it should be added in MyQueue
constructor and passed to super().__init__
as well in order to keep the interfaces the same.
Hello. Your class's docstrings indicate that the SharedCounter approach not only eliminates the error, but also provides more robust method implementations. what does it mean? I want to use the following code:
from multiprocessing import Value, get_context
from multiprocessing.queues import Queue as MultiprocessingQueue
from typing import Any
class SynchronizedCounter:
def __init__(self) -> None:
self.count = Value('i', 0)
def change_count(self, number: int) -> None:
with self.count.get_lock():
self.count.value += number
@property
def value(self) -> int:
return self.count.value
class Queue(MultiprocessingQueue):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, ctx=get_context(), **kwargs)
self._counter = SynchronizedCounter()
def __getstate__(self) -> dict[str, Any]:
return {
'parent_state': super().__getstate__(),
'_counter': self._counter
}
def __setstate__(self, state: dict) -> None:
super().__setstate__(state['parent_state'])
self._counter = state['_counter']
def put(self, *args, **kwargs) -> None:
self._counter.change_count(1)
super().put(*args, **kwargs)
def get(self, *args, **kwargs) -> Any:
self._counter.change_count(-1)
return super().get(*args, **kwargs)
@property
def count(self) -> int:
return self._counter.value
@property
def is_empty(self) -> bool:
return not self.count
without checking is sys.platform == 'darwin'.
I mean, I want to use this queue all the time because it allows methods to be referred to as properties.
Will my code look ridiculous?
@MikeMight What I mean by "robust method implementations" is that the vanilla python Queue class specifies that Queue.qsize()
is approximate (see doc) and Queue.empty()
also has no guarantee (see doc). However, the queue implementation here uses a separate mechanism to handle queue size, thus the return of qsize()
and empty()
should be guaranteed.
I tested your code by adapting it to the use case example, and it worked well. So I think your code is fine.
@FanchenBao Thank you for your reply and for the code you provided.
I want to ask about one more point, which is not to use a SharedCounter, but to put multiprocessing.Value in the attributes of the queue itself. I have already tried to do this and the results were the same, but I would like to know what were you guided by when you moved the logic of counting objects into a separate class? Is there any advantage to this?
It is possible that there will be mistakes in my answers, i apologize for them, i do not know engl well.
@MikeMight You are very welcome.
Regarding your question, putting multiprocessing.Value
in the custom Queue
class does work. However, by doing so, the custom queue would have too much coupling with the shared counter. In other words, we are mixing two relatively independent concepts (queue and shared counter) together. This has three downsides.
- It complicates the logic for each concept (e.g. we will have to worry about whether the shared counter logic would break if we alter the features in the queue), which in turn makes testing and future code refactoring a giant pain.
- It discourages code re-use (the shared counter concept could be re-used elsewhere).
- It makes documentation more difficult.
To avoid the downsides mentioned above, I would prefer leaving the queue and the shared counter as two separate classes.
@arturzangiev This is an update to our question regarding why the original code doesn't work in Python 3.8+. My question on SO finally gets answered. The problem with the original code is that the
MyQueue
object cannot be properly serialized to pass between spawned processes (i.e. thesize
attribute is not serialized under the default setting). By adding__getstate__()
and__setstate__()
functions, we manually dictate how theMyQueue
object shall be serialized and it can be used in spawned processes.Forked processes apparently do not need to serialize the
MyQueue
object because it employs memory sharing between processes.The code in
my_queue.py
has been updated, and it works now both in Python 3.7 and 3.8+. The hack solution of forcingmultiprocessing.set_start_method('fork')
is no longer needed.