Last active
April 22, 2018 23:42
-
-
Save fntneves/f0d0405ba77ce50b50bf9eea808cf5af to your computer and use it in GitHub Desktop.
Producer-[Consumer/Producer]-Consumer example with graceful shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Event, JoinableQueue, Process, current_process, active_children | |
import Queue | |
import time | |
import signal | |
import os | |
def ignore(signun, frame): | |
pass | |
def producer(output_queue): | |
pid = current_process().pid | |
exit = [False] | |
def start_shutdown(sigum, frame): | |
print 'Producer {} is interrupted...'.format(str(pid)) | |
exit[0] = True | |
signal.signal(signal.SIGINT, start_shutdown) | |
i = 0 | |
while not exit[0]: | |
output_queue.put((1, 'a', False)) | |
i = i + 1 | |
print 'Exiting producer... {}. Produced {} events.'.format(str(pid), str(i)) | |
def logger(input_queue): | |
pid = current_process().pid | |
exit = False | |
signal.signal(signal.SIGINT, ignore) | |
i = 0 | |
while not exit: | |
try: | |
# Read the remaining events and exit. | |
event = None | |
while event is None: | |
event = input_queue.get() | |
if event == 'exit': | |
exit = True | |
input_queue.task_done() | |
i = i + 1 | |
except Queue.Empty: | |
break | |
print 'Exiting logger... {}. Logged {} events.'.format(str(pid), str(i)) | |
def consumer(input_queue, output_queue): | |
pid = current_process().pid | |
exit = False | |
signal.signal(signal.SIGINT, ignore) | |
i = 0 | |
while not exit: | |
try: | |
# Before exiting, read the remaining events and exit. | |
event = None | |
while event is None: | |
event = input_queue.get() | |
# Prevent interruption side-effects. | |
if event == 'exit': | |
exit = True | |
else: | |
output_queue.put(event) | |
input_queue.task_done() | |
i = i + 1 | |
except Queue.Empty: | |
break | |
print 'Exiting worker... {}. Handled {} events.'.format(str(pid), str(i)) | |
amount = 4 | |
workers = [] | |
event_queue = JoinableQueue() | |
log_queue = JoinableQueue() | |
signal.signal(signal.SIGINT, ignore) | |
logger = Process(target=logger, args=(log_queue,)) | |
logger.daemon = True | |
logger.start() | |
workers.append(logger) | |
for _ in xrange(amount): | |
worker = Process(target=consumer, args=(event_queue, log_queue,)) | |
worker.daemon = True | |
worker.start() | |
workers.append(worker) | |
producer = Process(target=producer, args=(event_queue,)) | |
producer.daemon = True | |
producer.start() | |
# Wait for producer to end | |
producer.join() | |
print 'Attempting a graceful shutdown...' | |
# Wait for events to be processed. | |
event_queue.join() | |
for _ in xrange(amount): | |
# Send 'exit' events to all workers and to the logger | |
event_queue.put('exit') | |
# Wait for events to be logged. | |
log_queue.join() | |
log_queue.put('exit') | |
for worker in workers: | |
worker.join(timeout=5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment