Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Created December 18, 2024 09:42
Show Gist options
  • Save maxfischer2781/18b4688cd0157967b6241cff512de2f5 to your computer and use it in GitHub Desktop.
Save maxfischer2781/18b4688cd0157967b6241cff512de2f5 to your computer and use it in GitHub Desktop.
Tool for recording HTCondor job information from an execution point slot
from typing import Iterable, NamedTuple
import argparse
from pathlib import Path
import re
import socket
import subprocess
import time
CLI = argparse.ArgumentParser()
CLI.add_argument("SLOT", help=r"slot identifier as \d+_\d+")
CLI.add_argument(
"--interval", help="time between recording data", type=float, default=5 * 60
)
RECORD_CLI = CLI.add_argument_group("records")
RECORD_CLI.add_argument(
"--ps-tree", help="record the ps tree/forest of the job", action="store_true"
)
RECORD_CLI.add_argument(
"--cgroup-usage", help="record the cgroup reported CPU usage", action="store_true"
)
RECORD_CLI.add_argument(
"--classad", help="record the queue ClassAd of the job", action="store_true"
)
def every(interval: float) -> Iterable[None]:
next_wakeup = time.monotonic() + interval
while True:
yield
time.sleep(next_wakeup - time.monotonic())
next_wakeup += interval
def get_pstree(ppid: int, fields: "list[str]") -> Iterable[bytes]:
# There is no simple way to get just the tree for a given ppid from ps
# Instead we get all the trees and only extract the one we need
ps_proc = subprocess.Popen(
["ps", "-A", "--forest", "-o", ",".join(["pid", *fields, "command"])],
stdout=subprocess.PIPE,
)
try:
assert ps_proc.stdout is not None
ps_header = ps_proc.stdout.readline()
pid_end, command_start = _extract_ps_columns(ps_header)
depth = None
yield ps_header
for line in ps_proc.stdout:
if depth is None:
pid = line[:pid_end]
if int(pid) == ppid:
depth = line.find(rb"\_", command_start)
yield line
else:
if line.find(rb"\_", command_start) <= depth:
break
yield line
finally:
ps_proc.terminate()
def _extract_ps_columns(header: bytes) -> "tuple[int, int]":
pid, *_, command = re.finditer(rb"\S+", header)
assert pid.group() == b"PID"
assert command.group() == b"COMMAND"
return pid.end(), command.start()
class GlobalJobID:
def __init__(self, gji: str) -> None:
self._gji = gji
self.schedd, self.job_id, *_ = gji.split("#")
def __str__(self) -> str:
return self._gji
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._gji!r})"
def get_classad(job: GlobalJobID) -> Iterable[bytes]:
queue_proc = subprocess.Popen(
["condor_q", "-name", job.schedd, job.job_id, "-long"],
stdout=subprocess.PIPE,
)
assert queue_proc.stdout is not None
try:
yield from queue_proc.stdout
finally:
queue_proc.terminate()
class CGroupV1:
def __init__(self, cpuacct_root: Path):
self.cpuacct_root = cpuacct_root
def get_cpu_usage(self) -> float:
"""Return the CPU usage of the cgroup in seconds"""
cpu_stat = (self.cpuacct_root / "cpuacct.stat").read_text().strip()
return sum(int(line.partition(" ")[-1]) for line in cpu_stat.splitlines()) / 100
def contains_pid(self, pid: int) -> bool:
with open(self.cpuacct_root / "tasks") as pid_stream:
return any(int(line) == pid for line in pid_stream)
def get_leader_pid(self) -> int:
with open(self.cpuacct_root / "tasks") as pid_stream:
return int(next(pid_stream))
raise ValueError("no processes in cgroup")
class JobInfo(NamedTuple):
job_id: GlobalJobID
root_pid: int
cgroup: CGroupV1
ctime: int
def is_alive(self) -> bool:
return self.cgroup.contains_pid(self.root_pid)
@staticmethod
def from_slot(slot: str) -> "JobInfo":
raw_slot_match = re.match(r"\d+_\d+", slot)
if raw_slot_match is None:
raise ValueError(rf"could not read SLOT {slot} as '\d+_\d+'")
raw_slot = raw_slot_match.group(0)
cgroupv1_path = list(
Path("/sys/fs/cgroup/cpu,cpuacct/htcondor").glob(
f"*slot{raw_slot}@*",
)
)
if not cgroupv1_path:
raise NotImplementedError("cgroupv2")
cgroupv1 = CGroupV1(cgroupv1_path[0])
job_id, ctime = subprocess.check_output(
[
"condor_status",
"-slot",
f"slot{raw_slot}@{socket.getfqdn()}",
"-af",
"GlobalJobID",
"JobStart",
],
stderr=subprocess.DEVNULL,
encoding="utf-8",
).split()
return JobInfo(
job_id=GlobalJobID(job_id),
root_pid=cgroupv1.get_leader_pid(),
cgroup=cgroupv1,
ctime=int(ctime),
)
def record(
interval: float,
job_info: JobInfo,
want_ps_tree: bool,
want_cgroup_usage: bool,
want_classad: bool,
):
for _ in every(interval):
print("####")
print("timestamp", time.time())
print(time.strftime("%Y-%m-%dT%H:%M:%S"))
print("GlobalJobID", job_info.job_id)
print("RootPid", job_info.root_pid)
print("cgroup", "v1" if isinstance(job_info.cgroup, CGroupV1) else "v2")
print("####")
if want_ps_tree:
for line in get_pstree(
job_info.root_pid, ["user", "etime", "time", "pcpu"]
):
print(line.decode(), end="")
print("####")
if want_cgroup_usage:
usage = job_info.cgroup.get_cpu_usage()
print(
f"cgroup_usage={usage}",
f"cgroup_pcpu={usage / (time.time() - job_info.ctime):.2f}",
)
print("####")
if want_classad:
for line in get_classad(job_info.job_id):
print(line.decode(), end="")
print("####")
if not job_info.is_alive():
return
if __name__ == "__main__":
args = CLI.parse_args()
job_info = JobInfo.from_slot(args.SLOT)
record(args.interval, job_info, args.ps_tree, args.cgroup_usage, args.classad)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment