Skip to content

Instantly share code, notes, and snippets.

@mpickering
Created November 14, 2025 13:45
Show Gist options
  • Select an option

  • Save mpickering/f9fb2e8e157cf3815b3910cbdd1e1de8 to your computer and use it in GitHub Desktop.

Select an option

Save mpickering/f9fb2e8e157cf3815b3910cbdd1e1de8 to your computer and use it in GitHub Desktop.
#! /usr/bin/env nix-shell
#! nix-shell -i python3 -p python3
import argparse
import datetime
import os
import signal
import stat
import subprocess
import sys
import time
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Set, List
# --------------------------- Utility Functions ---------------------------
def list_sockets(socket_dir: Path) -> Set[Path]:
"""Return the set of Unix domain sockets in `socket_dir`."""
sockets: Set[Path] = set()
try:
with os.scandir(socket_dir) as it:
for entry in it:
try:
st = entry.stat(follow_symlinks=False)
except FileNotFoundError:
continue
if stat.S_ISSOCK(st.st_mode):
sockets.add(Path(entry.path))
except FileNotFoundError:
pass
return sockets
def _stream_reader(socket_path: Path, stream, stream_name: str) -> None:
"""Read lines from a stream and prefix each line with the socket path."""
try:
for line in iter(stream.readline, ""):
if not line:
break
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
prefix = f"[{timestamp} socket={socket_path.name} {stream_name}]"
print(f"{prefix} {line}", end="", flush=True)
finally:
try:
stream.close()
except Exception:
pass
@dataclass
class MonitoredProcess:
proc: subprocess.Popen
threads: List[threading.Thread]
# --------------------------- Cabal Setup ---------------------------
def cabal_build_and_get_binary() -> Path:
"""Build the executable and return the path to it."""
print("[monitor] Building eventlog-live-otelcol …", flush=True)
subprocess.run(
["cabal", "build", "eventlog-live-otelcol"],
check=True,
)
print("[monitor] Querying executable path using cabal list-bin …", flush=True)
result = subprocess.run(
["cabal", "list-bin", "eventlog-live-otelcol"],
check=True,
text=True,
capture_output=True,
)
exe_path = Path(result.stdout.strip())
print(f"[monitor] Using executable: {exe_path}", flush=True)
return exe_path
# --------------------------- Process Management ---------------------------
def start_eventlog_process(
exe_path: Path,
socket_path: Path,
otelcol_host: str,
extra_args: List[str] | None = None,
) -> MonitoredProcess:
if extra_args is None:
extra_args = []
cmd = [
str(exe_path),
f"--eventlog-socket={socket_path}",
f"--otelcol-host={otelcol_host}",
f"--service-name={socket_path.name.replace('.sock', '')}",
"--config=hls-config.yaml",
"--eventlog-flush-interval=60",
"-v3",
*extra_args,
]
print(f"[monitor] Starting process for {socket_path}: {' '.join(cmd)}", flush=True)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
threads: List[threading.Thread] = []
if proc.stdout:
t_out = threading.Thread(
target=_stream_reader,
args=(socket_path, proc.stdout, "STDOUT"),
daemon=True,
)
t_out.start()
threads.append(t_out)
if proc.stderr:
t_err = threading.Thread(
target=_stream_reader,
args=(socket_path, proc.stderr, "STDERR"),
daemon=True,
)
t_err.start()
threads.append(t_err)
return MonitoredProcess(proc=proc, threads=threads)
def stop_eventlog_process(socket_path: Path, mp: MonitoredProcess) -> None:
print(f"[monitor] Stopping process for {socket_path}", flush=True)
proc = mp.proc
proc.terminate()
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
print(f"[monitor] SIGKILL → {socket_path}", flush=True)
proc.kill()
# Allow threads to stop
for t in mp.threads:
if t.is_alive():
t.join(timeout=1)
# --------------------------- Main Loop ---------------------------
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--socket-dir", default="/home/matt/.ghc-eventlog-sockets")
parser.add_argument("--otelcol-host", default="localhost")
parser.add_argument("--poll-interval", type=float, default=1.0)
parser.add_argument("--extra-arg", action="append", default=[])
args = parser.parse_args()
socket_dir = Path(args.socket_dir)
extra_args: List[str] = args.extra_arg
# Build and get executable before anything else
exe_path = cabal_build_and_get_binary()
print(f"[monitor] Watching directory: {socket_dir}", flush=True)
processes: Dict[Path, MonitoredProcess] = {}
def shutdown(signum=None, frame=None):
print("[monitor] Shutting down …", flush=True)
for sock, mp in list(processes.items()):
stop_eventlog_process(sock, mp)
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
try:
while True:
current_sockets = list_sockets(socket_dir)
# Start processes for new sockets
for sock in current_sockets:
if sock not in processes:
mp = start_eventlog_process(
exe_path, sock, args.otelcol_host, extra_args
)
processes[sock] = mp
# Stop for removed sockets
for sock in list(processes.keys()):
if sock not in current_sockets:
mp = processes.pop(sock)
stop_eventlog_process(sock, mp)
# Restart crashed processes
for sock, mp in list(processes.items()):
if mp.proc.poll() is not None:
print(
f"[monitor] Process for {sock} exited. Restarting if present.",
flush=True,
)
processes.pop(sock, None)
time.sleep(args.poll_interval)
except KeyboardInterrupt:
shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment