Skip to content

Instantly share code, notes, and snippets.

@secemp9
Created February 6, 2024 12:28
Show Gist options
  • Save secemp9/7b08c5fe6409c0a9108126b6c3cfd29f to your computer and use it in GitHub Desktop.
Save secemp9/7b08c5fe6409c0a9108126b6c3cfd29f to your computer and use it in GitHub Desktop.
Threading wait POC 3
import threading
import time
class ControlledExecution:
def __init__(self):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.shutdown = False
def worker(self):
print("Worker started. Press Ctrl+C to stop.")
for i in range(10): # Simulated task
with self.lock:
if self.shutdown:
print("Worker stopping early.")
return # Use return instead of break to ensure clean exit
print(f"Processing item {i}...")
time.sleep(2) # Simulate time-consuming task
print("Worker finished processing all items.")
def main_loop(self):
thread = threading.Thread(target=self.worker)
thread.start()
while not self.shutdown:
try:
time.sleep(1) # Main loop does minimal work, just sleeps and checks for shutdown
except KeyboardInterrupt:
print("Shutdown signal received, shutting down gracefully...")
self.signal_shutdown()
break
thread.join()
def signal_shutdown(self):
with self.lock:
self.shutdown = True
with self.condition:
self.condition.notify_all() # Notify any waiting threads to wake up
if __name__ == "__main__":
controlled_execution = ControlledExecution()
controlled_execution.main_loop()
print("Main program exited gracefully.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment