Created
November 14, 2025 13:45
-
-
Save mpickering/f9fb2e8e157cf3815b3910cbdd1e1de8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env nix-shell | |
| #! nix-shell -i python3 -p python3 | |
| import argparse | |
| import datetime | |
| import os | |
| import signal | |
| import stat | |
| import subprocess | |
| import sys | |
| import time | |
| import threading | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Set, List | |
| # --------------------------- Utility Functions --------------------------- | |
| def list_sockets(socket_dir: Path) -> Set[Path]: | |
| """Return the set of Unix domain sockets in `socket_dir`.""" | |
| sockets: Set[Path] = set() | |
| try: | |
| with os.scandir(socket_dir) as it: | |
| for entry in it: | |
| try: | |
| st = entry.stat(follow_symlinks=False) | |
| except FileNotFoundError: | |
| continue | |
| if stat.S_ISSOCK(st.st_mode): | |
| sockets.add(Path(entry.path)) | |
| except FileNotFoundError: | |
| pass | |
| return sockets | |
| def _stream_reader(socket_path: Path, stream, stream_name: str) -> None: | |
| """Read lines from a stream and prefix each line with the socket path.""" | |
| try: | |
| for line in iter(stream.readline, ""): | |
| if not line: | |
| break | |
| timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | |
| prefix = f"[{timestamp} socket={socket_path.name} {stream_name}]" | |
| print(f"{prefix} {line}", end="", flush=True) | |
| finally: | |
| try: | |
| stream.close() | |
| except Exception: | |
| pass | |
| @dataclass | |
| class MonitoredProcess: | |
| proc: subprocess.Popen | |
| threads: List[threading.Thread] | |
| # --------------------------- Cabal Setup --------------------------- | |
| def cabal_build_and_get_binary() -> Path: | |
| """Build the executable and return the path to it.""" | |
| print("[monitor] Building eventlog-live-otelcol …", flush=True) | |
| subprocess.run( | |
| ["cabal", "build", "eventlog-live-otelcol"], | |
| check=True, | |
| ) | |
| print("[monitor] Querying executable path using cabal list-bin …", flush=True) | |
| result = subprocess.run( | |
| ["cabal", "list-bin", "eventlog-live-otelcol"], | |
| check=True, | |
| text=True, | |
| capture_output=True, | |
| ) | |
| exe_path = Path(result.stdout.strip()) | |
| print(f"[monitor] Using executable: {exe_path}", flush=True) | |
| return exe_path | |
| # --------------------------- Process Management --------------------------- | |
| def start_eventlog_process( | |
| exe_path: Path, | |
| socket_path: Path, | |
| otelcol_host: str, | |
| extra_args: List[str] | None = None, | |
| ) -> MonitoredProcess: | |
| if extra_args is None: | |
| extra_args = [] | |
| cmd = [ | |
| str(exe_path), | |
| f"--eventlog-socket={socket_path}", | |
| f"--otelcol-host={otelcol_host}", | |
| f"--service-name={socket_path.name.replace('.sock', '')}", | |
| "--config=hls-config.yaml", | |
| "--eventlog-flush-interval=60", | |
| "-v3", | |
| *extra_args, | |
| ] | |
| print(f"[monitor] Starting process for {socket_path}: {' '.join(cmd)}", flush=True) | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| threads: List[threading.Thread] = [] | |
| if proc.stdout: | |
| t_out = threading.Thread( | |
| target=_stream_reader, | |
| args=(socket_path, proc.stdout, "STDOUT"), | |
| daemon=True, | |
| ) | |
| t_out.start() | |
| threads.append(t_out) | |
| if proc.stderr: | |
| t_err = threading.Thread( | |
| target=_stream_reader, | |
| args=(socket_path, proc.stderr, "STDERR"), | |
| daemon=True, | |
| ) | |
| t_err.start() | |
| threads.append(t_err) | |
| return MonitoredProcess(proc=proc, threads=threads) | |
| def stop_eventlog_process(socket_path: Path, mp: MonitoredProcess) -> None: | |
| print(f"[monitor] Stopping process for {socket_path}", flush=True) | |
| proc = mp.proc | |
| proc.terminate() | |
| try: | |
| proc.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| print(f"[monitor] SIGKILL → {socket_path}", flush=True) | |
| proc.kill() | |
| # Allow threads to stop | |
| for t in mp.threads: | |
| if t.is_alive(): | |
| t.join(timeout=1) | |
| # --------------------------- Main Loop --------------------------- | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--socket-dir", default="/home/matt/.ghc-eventlog-sockets") | |
| parser.add_argument("--otelcol-host", default="localhost") | |
| parser.add_argument("--poll-interval", type=float, default=1.0) | |
| parser.add_argument("--extra-arg", action="append", default=[]) | |
| args = parser.parse_args() | |
| socket_dir = Path(args.socket_dir) | |
| extra_args: List[str] = args.extra_arg | |
| # Build and get executable before anything else | |
| exe_path = cabal_build_and_get_binary() | |
| print(f"[monitor] Watching directory: {socket_dir}", flush=True) | |
| processes: Dict[Path, MonitoredProcess] = {} | |
| def shutdown(signum=None, frame=None): | |
| print("[monitor] Shutting down …", flush=True) | |
| for sock, mp in list(processes.items()): | |
| stop_eventlog_process(sock, mp) | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, shutdown) | |
| signal.signal(signal.SIGTERM, shutdown) | |
| try: | |
| while True: | |
| current_sockets = list_sockets(socket_dir) | |
| # Start processes for new sockets | |
| for sock in current_sockets: | |
| if sock not in processes: | |
| mp = start_eventlog_process( | |
| exe_path, sock, args.otelcol_host, extra_args | |
| ) | |
| processes[sock] = mp | |
| # Stop for removed sockets | |
| for sock in list(processes.keys()): | |
| if sock not in current_sockets: | |
| mp = processes.pop(sock) | |
| stop_eventlog_process(sock, mp) | |
| # Restart crashed processes | |
| for sock, mp in list(processes.items()): | |
| if mp.proc.poll() is not None: | |
| print( | |
| f"[monitor] Process for {sock} exited. Restarting if present.", | |
| flush=True, | |
| ) | |
| processes.pop(sock, None) | |
| time.sleep(args.poll_interval) | |
| except KeyboardInterrupt: | |
| shutdown() | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment