Skip to content

Instantly share code, notes, and snippets.

@mrjk
Created October 10, 2024 14:55
Show Gist options
  • Save mrjk/a3a03bd1775fb3dd5b64f09af2f81426 to your computer and use it in GitHub Desktop.
Save mrjk/a3a03bd1775fb3dd5b64f09af2f81426 to your computer and use it in GitHub Desktop.
Stage listener for supervisord
#!/usr/bin/python
"""
Listener for supervisord
How it works?
It will wait all process_names (aka `program:NAME` sections) have reach the same state. Then
it run another command. Its main use case is to allow supervisord services dependencies.
Example configuration:
```
[eventlistener:event_wait_oauth2]
command=/usr/bin/listener.py
events=PROCESS_STATE_EXITED
environment=PROCESSNAME="wait_oauth2,gen_conf",EXECUTE="supervisorctl start nginx",COUNT="1",DEBUG="true"
# Comment this to hide all log messages
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes = 0
```
"""
# see: https://github.com/paramah/slistener/blob/master/listener.py
import sys
import os
import logging
import subprocess
import time
from supervisor.childutils import listener
def main(args):
"Main supervisord listener"
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG,
format="%(asctime)s %(levelname)s %(filename)s: %(message)s",
)
logger = logging.getLogger("supervisord-eventlistener")
debug_mode = True if "DEBUG" in os.environ else False
if "PROCESSNAME" in os.environ:
process_names = os.environ["PROCESSNAME"]
process_names = process_names.split(",")
else:
logger.critical("Set PROCESSNAME in environment!")
sys.exit(1)
if "EXECUTE" in os.environ:
execute_cmd = os.environ["EXECUTE"].split(" ")
else:
logger.critical("Set EXECUTE in environment!")
sys.exit(1)
event_name = os.environ.get("EVENT", "PROCESS_STATE_EXITED")
sleep_time = int(os.environ.get("DELAY", "0"))
max_run = int(os.environ.get("COUNT", "-1")) # Set minus for infinite
max_run_orig = max_run
process_validated = []
logger.info(
"Start to listen for %s events for processes: %s",
event_name,
",".join(process_names),
)
while True:
headers, body = listener.wait(sys.stdin, sys.stdout)
body = dict([pair.split(":") for pair in body.split(" ")])
process_name = str(body.get("processname", ""))
curr_eventname = headers.get("eventname", "")
if debug_mode:
logger.debug("Headers: %r", repr(headers))
logger.debug("Body: %r", repr(body))
logger.debug("Args: %r", repr(args))
logger.debug("ENV: %r", repr(os.environ))
# Check if event matches command and state
if not (curr_eventname == event_name and process_name in process_names):
listener.ok(sys.stdout)
continue
logger.info("Process '%s' entered '%s' state...", process_name, curr_eventname)
# Add process to the list
if process_name not in process_validated:
process_validated.append(process_name)
# Check if all process reached correct state
if len(process_validated) != len(process_names):
tmp = f"{ len(process_validated) }/{ len(process_names) }"
logger.info("Waiting %s processes before running command", tmp)
listener.ok(sys.stdout)
continue
# Wait time
if sleep_time != 0:
logger.info(
"Execute '%s' in %s (sec)...", " ".join(execute_cmd), str(sleep_time)
)
time.sleep(sleep_time)
# Execute command
logger.info(
"All processes %s %s entered in correct state %s, executing command: %s",
len(process_validated),
",".join(process_validated),
event_name,
execute_cmd,
)
try:
ret = subprocess.call(execute_cmd, stdout=sys.stderr)
except Exception as err:
logger.critical("Unexpected Exception: %s", str(err))
listener.fail(sys.stdout)
sys.exit(1)
else:
listener.ok(sys.stdout)
# Quit on max run
max_run = max_run - 1
if max_run == 0:
logger.info("Exit normally after %s/%s run", max_run_orig, max_run_orig)
sys.exit(0)
elif max_run > 0:
logger.info("Wait for next event (remaining %s/%s)", max_run, max_run_orig)
else:
logger.info("Wait for next event")
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment