Skip to content

Instantly share code, notes, and snippets.

@RasmusFonseca
Last active September 30, 2024 07:17
Show Gist options
  • Save RasmusFonseca/1ac80e623707bcb3c4e06b5042b766dc to your computer and use it in GitHub Desktop.
Save RasmusFonseca/1ac80e623707bcb3c4e06b5042b766dc to your computer and use it in GitHub Desktop.
Python multiple producer / single consumer example
from multiprocessing import Process, Queue
def square_producer(inputqueue, resultqueue):
"""
A producer that pops numbers off the inputqueue, squares them and puts the result on resultqueue
"""
while True:
num = inputqueue.get()
if num is None:
return
resultqueue.put(num*num)
print("Produced", num*num)
def consumer(resultqueue):
"""
A consumer that pops results off the resultqueue and prints them to screen
"""
while True:
numsq = resultqueue.get()
if numsq is None:
return
print("Consumed", numsq)
num_producers = 3
# Generate input
inputqueue = Queue()
for i in range(100):
inputqueue.put(i % 10)
for _ in range(num_producers):
inputqueue.put(None) # Ensures that producers terminate
resultqueue = Queue() # For transfer of data from producer to consumer
# Set up and start producer processes
producers = [Process(target=square_producer, args=(inputqueue, resultqueue)) for _ in range(num_producers)]
for p in producers:
p.start()
# Set up and start consumer process
consumer = Process(target=consumer, args=(resultqueue,))
consumer.start()
# Wait for producers to finish
for p in producers:
p.join()
# Wait for consumer to finish
resultqueue.put(None)
consumer.join()
print("All done")
@theothermattm
Copy link

This was a very helpful snippet for me, thanks!

@RasmusFonseca
Copy link
Author

No problem. Thanks for leaving a comment!

@LiorA1
Copy link

LiorA1 commented Nov 12, 2021

Its great, but some modifications can contribute:

  1. Use queues to signal to the consumer that the producer finished. (like this: https://github.com/voidrealms/python3/blob/main/python3-53/python3-53.py)
  2. Using classes for better oop integration. (https://stackoverflow.com/a/54499606/3790620)

@sananand007
Copy link

This does not work in 2024 anymore. It needs some modifications

@3zhang
Copy link

3zhang commented Aug 22, 2024

Its great, but some modifications can contribute:

  1. Use queues to signal to the consumer that the producer finished. (like this: https://github.com/voidrealms/python3/blob/main/python3-53/python3-53.py)
  2. Using classes for better oop integration. (https://stackoverflow.com/a/54499606/3790620)

The first example you provided is not for a multi producer single consumer problem. And using while loop to continuously check queue's status is quite awkward.

@3zhang
Copy link

3zhang commented Aug 22, 2024

A modified version:

  1. Use get_nowait with exception to check the empty status of inputqueue instead of inserting Nones.
  2. In single consumer scenario, it's more convenient to set up consumer in the main process and use a Thread to monitor all producers.
  3. Add time.sleep in producer and consumer to make it more real.
from multiprocessing import Process, Queue
from queue import Empty
from threading import Thread
import time
import random


def square_producer(inputqueue, resultqueue):
    """
    A producer that pops numbers off the inputqueue, squares them and puts the result on resultqueue
    """
    while True:
        try:
            num = inputqueue.get_nowait()
            time.sleep(random.random())
            resultqueue.put(num*num)
            print("Produced", num*num)
        except Empty:
            return
        
def wait_for_producers(producers, resultqueue):
    for p in producers:
        p.join()
    
    resultqueue.put(None)
    print("All produced")

if __name__ == '__main__':

    num_producers = 3

    inputqueue = Queue()
    for i in range(100):
        inputqueue.put(i % 10)
    
    resultqueue = Queue()
    
    producers = [Process(target=square_producer, args=(inputqueue, resultqueue)) for _ in range(num_producers)]
    for p in producers:
        p.start()
    
    monitor = Thread(target=wait_for_producers, args=(producers, resultqueue))
    monitor.start()
    
    while True:
        numsq = resultqueue.get()
        if numsq is None:
            break
        time.sleep(0.2)
        print("Consumed", numsq)
        
    print("All consumed")
    monitor.join()
    
    print("All done")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment