Created
August 20, 2025 07:41
-
-
Save arnetheduck/14253aa83bd797cf984d4bcc8f067ea8 to your computer and use it in GitHub Desktop.
Threaded stop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Process state helper using a global to eventually shut down | |
import std/atomics, chronos, chronos/threadsync, chronicles | |
type | |
ProcessState* {.pure.} = enum | |
Starting | |
Running | |
Stopping | |
var processState: Atomic[ProcessState] | |
import system/ansi_c | |
when defined posix: | |
import posix | |
proc ignoreSignalsInThread*() = | |
# Block all signals in this thread, so we don't interfere with regular signal | |
# handling elsewhere. | |
var signalMask, oldSignalMask: Sigset | |
# sigprocmask() doesn't work on macOS, for multithreaded programs | |
if sigfillset(signalMask) != 0: | |
quit(QuitFailure) | |
if pthread_sigmask(SIG_BLOCK, signalMask, oldSignalMask) != 0: | |
echo osErrorMsg(osLastError()) | |
quit(QuitFailure) | |
proc scheduleStop*(_: type ProcessState) = | |
processState.store(Stopping, moRelaxed) | |
discard kill(getpid(), SIGTERM) | |
else: | |
proc ignoreSignalsInThread*() = | |
discard | |
proc scheduleStop*(_: type ProcessState) = | |
processState.store(Stopping, moRelaxed) | |
discard c_raise(SIGINT) | |
proc notifyRunning*(_: type ProcessState) = | |
processState.store(Running, moRelaxed) | |
proc setupSignalHandlers*(_: type ProcessState) = | |
# The CTRL-C handling provided by `signal` does not wake up the async polling | |
# loop and can therefore get stuck if no events are happening - this function | |
# should be called early on from the main thread to avoid the default Nim | |
# signal handlers from being used! | |
proc controlCHandler(a: cint) {.noconv.} = | |
c_printf("dying\n") | |
processState.store(Stopping, moRelaxed) | |
# Avoid using `setControlCHook` since it has an | |
c_signal(ansi_c.SIGINT, controlCHandler) | |
# equivalent SIGTERM handler | |
when declared(ansi_c.SIGTERM): | |
c_signal(ansi_c.SIGTERM, controlCHandler) | |
proc setupAsyncSignalHandlers*(_: type ProcessState) {.async: (raises: []).} = | |
## async signal handlers should be set up only in the main thread - threads | |
## should wait using a `ThreadSignalPtr` activated from the main thread! | |
let | |
sigint = waitSignal(chronos.SIGINT) | |
sigterm = waitSignal(chronos.SIGTERM) | |
info "Waiting for signal" | |
discard await noCancel race(sigint, sigterm) | |
processState.store(Stopping, moRelaxed) | |
# One of these will be finished, which is fine! | |
await noCancel sigint.cancelAndWait() | |
await noCancel sigterm.cancelAndWait() | |
proc pollUntilStopped*(_: type ProcessState) = | |
while processState.load(moRelaxed) != Stopping: | |
poll() | |
notice "Shutting down", chroniclesThreadIds = true | |
when isMainModule: | |
proc threadWork() {.async.} = | |
var todo = 5 | |
while todo > 0: | |
echo "Terminating in ", todo | |
await sleepAsync(1.seconds) | |
todo -= 1 | |
ProcessState.scheduleStop() | |
while true: | |
echo "Waiting for the end... " | |
await sleepAsync(1.seconds) | |
proc worker(p: ThreadSignalPtr) {.thread.} = | |
ignoreSignalsInThread() | |
let | |
stop = p.wait() | |
work = threadWork() | |
discard waitFor race(stop, work) | |
waitFor noCancel stop.cancelAndWait() | |
waitFor noCancel work.cancelAndWait() | |
proc main() = | |
let stopper = ThreadSignalPtr.new().expect("working thread signal") | |
var workerThread: Thread[ThreadSignalPtr] | |
createThread(workerThread, worker, stopper) | |
ProcessState.setupSignalHandlers() | |
let stop = ProcessState.setupAsyncSignalHandlers() | |
ProcessState.pollUntilStopped() | |
waitFor stopper.fire() | |
workerThread.joinThread() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment