Skip to content

Instantly share code, notes, and snippets.

@cbonesana
Created March 7, 2023 12:41
Show Gist options
  • Save cbonesana/5c28239548ad36554570150a458a1efe to your computer and use it in GitHub Desktop.
Save cbonesana/5c28239548ad36554570150a458a1efe to your computer and use it in GitHub Desktop.
Producer and consumer using python, with KeyboardInterrupt ctrl+c
from multiprocessing import Queue, Process
import random
import time
import signal
stop_sentinel = "STOP"
class Producer(Process):
def __init__(self, queue_slow: Queue, queue_fast: Queue) -> None:
super().__init__()
self.slow = queue_slow
self.fast = queue_fast
def run(self) -> None:
try:
for i in range(100):
if random.random() < 0.3:
print("Producer: slow", i)
self.slow.put(f"slow {i}")
else:
print("Producer: fast", i)
self.fast.put(f"fast {i}")
for i in range(30):
for j in range(10):
print("Producer: fast", i, j)
self.fast.put(f"fast {i} {j}")
time.sleep(3)
except KeyboardInterrupt:
print("Producer: Caught KeyboardInterrupt!")
class Consumer(Process):
def __init__(self, idx: int, queue: Queue, speed: float) -> None:
super().__init__()
self.queue = queue
self.idx = idx
self.stop = False
self.speed = speed
def run(self) -> None:
try:
while not self.stop:
package = self.queue.get()
if package == stop_sentinel:
self.stop = True
print("stop", self.idx)
else:
print("process", self.idx, "task", package)
time.sleep(self.speed)
except KeyboardInterrupt:
print(f"Consumer{self.idx}: Caught KeyboardInterrupt! Setting stop event...")
self.stop = True
def handler(signalname):
"""
Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.
Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.
"""
def f(signal_received, frame):
raise KeyboardInterrupt(f"{signalname} received")
return f
if __name__ == "__main__":
from multiprocessing import set_start_method
set_start_method("spawn")
q1 = Queue()
q2 = Queue()
# This will be inherited by the child process if it is forked (not spawned)
signal.signal(signal.SIGINT, handler("SIGINT"))
signal.signal(signal.SIGTERM, handler("SIGTERM"))
p = Producer(q1, q2)
slow1 = Consumer(0, q1, 1)
fast1 = Consumer(1, q2, 0.1)
fast2 = Consumer(2, q2, 0.1)
try:
p.start()
slow1.start()
fast1.start()
fast2.start()
time.sleep(20)
q1.put(stop_sentinel)
q2.put(stop_sentinel)
q2.put(stop_sentinel)
except KeyboardInterrupt:
print("stop!")
try:
p.join()
slow1.join()
fast1.join()
fast2.join()
except KeyboardInterrupt:
print("here!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment