Created
October 10, 2024 14:55
-
-
Save mrjk/a3a03bd1775fb3dd5b64f09af2f81426 to your computer and use it in GitHub Desktop.
Stage listener for supervisord
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
Listener for supervisord | |
How it works? | |
It will wait all process_names (aka `program:NAME` sections) have reach the same state. Then | |
it run another command. Its main use case is to allow supervisord services dependencies. | |
Example configuration: | |
``` | |
[eventlistener:event_wait_oauth2] | |
command=/usr/bin/listener.py | |
events=PROCESS_STATE_EXITED | |
environment=PROCESSNAME="wait_oauth2,gen_conf",EXECUTE="supervisorctl start nginx",COUNT="1",DEBUG="true" | |
# Comment this to hide all log messages | |
stderr_logfile=/dev/stderr | |
stderr_logfile_maxbytes = 0 | |
``` | |
""" | |
# see: https://github.com/paramah/slistener/blob/master/listener.py | |
import sys | |
import os | |
import logging | |
import subprocess | |
import time | |
from supervisor.childutils import listener | |
def main(args): | |
"Main supervisord listener" | |
logging.basicConfig( | |
stream=sys.stderr, | |
level=logging.DEBUG, | |
format="%(asctime)s %(levelname)s %(filename)s: %(message)s", | |
) | |
logger = logging.getLogger("supervisord-eventlistener") | |
debug_mode = True if "DEBUG" in os.environ else False | |
if "PROCESSNAME" in os.environ: | |
process_names = os.environ["PROCESSNAME"] | |
process_names = process_names.split(",") | |
else: | |
logger.critical("Set PROCESSNAME in environment!") | |
sys.exit(1) | |
if "EXECUTE" in os.environ: | |
execute_cmd = os.environ["EXECUTE"].split(" ") | |
else: | |
logger.critical("Set EXECUTE in environment!") | |
sys.exit(1) | |
event_name = os.environ.get("EVENT", "PROCESS_STATE_EXITED") | |
sleep_time = int(os.environ.get("DELAY", "0")) | |
max_run = int(os.environ.get("COUNT", "-1")) # Set minus for infinite | |
max_run_orig = max_run | |
process_validated = [] | |
logger.info( | |
"Start to listen for %s events for processes: %s", | |
event_name, | |
",".join(process_names), | |
) | |
while True: | |
headers, body = listener.wait(sys.stdin, sys.stdout) | |
body = dict([pair.split(":") for pair in body.split(" ")]) | |
process_name = str(body.get("processname", "")) | |
curr_eventname = headers.get("eventname", "") | |
if debug_mode: | |
logger.debug("Headers: %r", repr(headers)) | |
logger.debug("Body: %r", repr(body)) | |
logger.debug("Args: %r", repr(args)) | |
logger.debug("ENV: %r", repr(os.environ)) | |
# Check if event matches command and state | |
if not (curr_eventname == event_name and process_name in process_names): | |
listener.ok(sys.stdout) | |
continue | |
logger.info("Process '%s' entered '%s' state...", process_name, curr_eventname) | |
# Add process to the list | |
if process_name not in process_validated: | |
process_validated.append(process_name) | |
# Check if all process reached correct state | |
if len(process_validated) != len(process_names): | |
tmp = f"{ len(process_validated) }/{ len(process_names) }" | |
logger.info("Waiting %s processes before running command", tmp) | |
listener.ok(sys.stdout) | |
continue | |
# Wait time | |
if sleep_time != 0: | |
logger.info( | |
"Execute '%s' in %s (sec)...", " ".join(execute_cmd), str(sleep_time) | |
) | |
time.sleep(sleep_time) | |
# Execute command | |
logger.info( | |
"All processes %s %s entered in correct state %s, executing command: %s", | |
len(process_validated), | |
",".join(process_validated), | |
event_name, | |
execute_cmd, | |
) | |
try: | |
ret = subprocess.call(execute_cmd, stdout=sys.stderr) | |
except Exception as err: | |
logger.critical("Unexpected Exception: %s", str(err)) | |
listener.fail(sys.stdout) | |
sys.exit(1) | |
else: | |
listener.ok(sys.stdout) | |
# Quit on max run | |
max_run = max_run - 1 | |
if max_run == 0: | |
logger.info("Exit normally after %s/%s run", max_run_orig, max_run_orig) | |
sys.exit(0) | |
elif max_run > 0: | |
logger.info("Wait for next event (remaining %s/%s)", max_run, max_run_orig) | |
else: | |
logger.info("Wait for next event") | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment