Skip to content

Instantly share code, notes, and snippets.

@wpcarro
Last active December 18, 2024 17:02
Show Gist options
  • Save wpcarro/6b3f36348b407243e6bb616b69ec496b to your computer and use it in GitHub Desktop.
Save wpcarro/6b3f36348b407243e6bb616b69ec496b to your computer and use it in GitHub Desktop.
Durable state (signal handling)
import signal
import time
import json
import os
state = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def persist_state():
print("Persisting state...")
tmp = "/tmp/buffer.json"
dst = "/tmp/dump.json"
with open(tmp, "w") as f:
f.write(json.dumps(state))
f.flush()
os.fsync(f)
os.rename(tmp, dst)
def handle_signal(signum, _frame):
print(f"Received signal: {signum}")
match signum:
case signal.SIGINT:
persist_state()
# Restore the default behavior and re-signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
raise KeyboardInterrupt
case signal.SIGTERM:
persist_state()
# Restore the default behavior and re-signal
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SystemExit
case signal.SIGHUP:
persist_state()
# Restore the default behavior and re-signal
signal.signal(signal.SIGHUP, signal.SIG_DFL)
raise SystemExit
case x:
logger.error(f"Unhandled signal: {x}")
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGHUP, handle_signal)
while True:
print("Awaiting signal...")
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment