Last active
May 31, 2019 05:14
-
-
Save serihiro/6374d1cc32268b87b5840efafde12aa8 to your computer and use it in GitHub Desktop.
python mulitprocessing.Queue test for each Context. See also https://gist.github.com/serihiro/bb9614b816b9c35e95ec4b53d3382f72
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import queue | |
import argparse | |
import time | |
def task1(out_queue): | |
for i in range(0, 5): | |
out_queue.put(i) | |
def task2(terminate, in_queue, out_queue): | |
while not terminate.is_set(): | |
while True: | |
try: | |
value = in_queue.get(timeout=0.01) | |
except queue.Empty: | |
if terminate.is_set(): | |
return | |
else: | |
break | |
value += 1 | |
while True: | |
try: | |
out_queue.put(value) | |
except queue.Full: | |
if terminate.is_set(): | |
return | |
else: | |
break | |
def task3(terminate, in_queue, out_queue): | |
while not terminate.is_set(): | |
while True: | |
try: | |
value = in_queue.get(timeout=0.01) | |
except queue.Empty: | |
if terminate.is_set(): | |
return | |
else: | |
break | |
value *= 100 | |
while True: | |
try: | |
out_queue.put(value) | |
except queue.Full: | |
if terminate.is_set(): | |
return | |
pass | |
else: | |
break | |
def main(): | |
queue1 = multiprocessing.Queue(100) | |
queue2 = multiprocessing.Queue(100) | |
queue3 = multiprocessing.Queue(100) | |
terminate = multiprocessing.Event() | |
p1 = multiprocessing.Process(target=task1, args=[queue1]) | |
p2 = multiprocessing.Process(target=task2, args=[terminate, queue1, queue2]) | |
p3 = multiprocessing.Process(target=task3, args=[terminate, queue2, queue3]) | |
p1.start() | |
p2.start() | |
p3.start() | |
time.sleep(0.5) | |
terminate.set() | |
p1.join() | |
p2.join() | |
p3.join() | |
for _ in range(0, 5): | |
print(queue3.get(timeout=1)) | |
# 100 | |
# 200 | |
# 300 | |
# 400 | |
# 500 | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--start_method", type=str, default="fork") | |
args = parser.parse_args() | |
multiprocessing.set_start_method(args.start_method) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment