Skip to content

Instantly share code, notes, and snippets.

@cbonesana
Created August 5, 2022 09:37
Show Gist options
  • Save cbonesana/2be1c50fff2eeaa96d91b89441092d57 to your computer and use it in GitHub Desktop.
Save cbonesana/2be1c50fff2eeaa96d91b89441092d57 to your computer and use it in GitHub Desktop.
Experimenting using multiprocessign and signals in python.
import time
import multiprocessing
import signal
from multiprocessing import Event
class W(multiprocessing.Process):
def __init__(self, i: int, event: Event) -> None:
super().__init__()
self.i = i
self.event = event
def run(self) -> None:
while not self.event.is_set():
print(self.i, 'working...')
time.sleep(1)
print(self.i, 'completed')
if __name__ == '__main__':
event = Event()
def main_signal_handler(signum, frame):
if not event.is_set():
print('setting event')
event.set()
signal.signal(signal.SIGINT, main_signal_handler)
signal.signal(signal.SIGTERM, main_signal_handler)
workers = [W(w, event) for w in range(3)]
for w in workers:
w.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment