Created
December 18, 2024 09:42
-
-
Save maxfischer2781/18b4688cd0157967b6241cff512de2f5 to your computer and use it in GitHub Desktop.
Tool for recording HTCondor job information from an execution point slot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterable, NamedTuple | |
import argparse | |
from pathlib import Path | |
import re | |
import socket | |
import subprocess | |
import time | |
CLI = argparse.ArgumentParser() | |
CLI.add_argument("SLOT", help=r"slot identifier as \d+_\d+") | |
CLI.add_argument( | |
"--interval", help="time between recording data", type=float, default=5 * 60 | |
) | |
RECORD_CLI = CLI.add_argument_group("records") | |
RECORD_CLI.add_argument( | |
"--ps-tree", help="record the ps tree/forest of the job", action="store_true" | |
) | |
RECORD_CLI.add_argument( | |
"--cgroup-usage", help="record the cgroup reported CPU usage", action="store_true" | |
) | |
RECORD_CLI.add_argument( | |
"--classad", help="record the queue ClassAd of the job", action="store_true" | |
) | |
def every(interval: float) -> Iterable[None]: | |
next_wakeup = time.monotonic() + interval | |
while True: | |
yield | |
time.sleep(next_wakeup - time.monotonic()) | |
next_wakeup += interval | |
def get_pstree(ppid: int, fields: "list[str]") -> Iterable[bytes]: | |
# There is no simple way to get just the tree for a given ppid from ps | |
# Instead we get all the trees and only extract the one we need | |
ps_proc = subprocess.Popen( | |
["ps", "-A", "--forest", "-o", ",".join(["pid", *fields, "command"])], | |
stdout=subprocess.PIPE, | |
) | |
try: | |
assert ps_proc.stdout is not None | |
ps_header = ps_proc.stdout.readline() | |
pid_end, command_start = _extract_ps_columns(ps_header) | |
depth = None | |
yield ps_header | |
for line in ps_proc.stdout: | |
if depth is None: | |
pid = line[:pid_end] | |
if int(pid) == ppid: | |
depth = line.find(rb"\_", command_start) | |
yield line | |
else: | |
if line.find(rb"\_", command_start) <= depth: | |
break | |
yield line | |
finally: | |
ps_proc.terminate() | |
def _extract_ps_columns(header: bytes) -> "tuple[int, int]": | |
pid, *_, command = re.finditer(rb"\S+", header) | |
assert pid.group() == b"PID" | |
assert command.group() == b"COMMAND" | |
return pid.end(), command.start() | |
class GlobalJobID: | |
def __init__(self, gji: str) -> None: | |
self._gji = gji | |
self.schedd, self.job_id, *_ = gji.split("#") | |
def __str__(self) -> str: | |
return self._gji | |
def __repr__(self) -> str: | |
return f"{self.__class__.__name__}({self._gji!r})" | |
def get_classad(job: GlobalJobID) -> Iterable[bytes]: | |
queue_proc = subprocess.Popen( | |
["condor_q", "-name", job.schedd, job.job_id, "-long"], | |
stdout=subprocess.PIPE, | |
) | |
assert queue_proc.stdout is not None | |
try: | |
yield from queue_proc.stdout | |
finally: | |
queue_proc.terminate() | |
class CGroupV1: | |
def __init__(self, cpuacct_root: Path): | |
self.cpuacct_root = cpuacct_root | |
def get_cpu_usage(self) -> float: | |
"""Return the CPU usage of the cgroup in seconds""" | |
cpu_stat = (self.cpuacct_root / "cpuacct.stat").read_text().strip() | |
return sum(int(line.partition(" ")[-1]) for line in cpu_stat.splitlines()) / 100 | |
def contains_pid(self, pid: int) -> bool: | |
with open(self.cpuacct_root / "tasks") as pid_stream: | |
return any(int(line) == pid for line in pid_stream) | |
def get_leader_pid(self) -> int: | |
with open(self.cpuacct_root / "tasks") as pid_stream: | |
return int(next(pid_stream)) | |
raise ValueError("no processes in cgroup") | |
class JobInfo(NamedTuple): | |
job_id: GlobalJobID | |
root_pid: int | |
cgroup: CGroupV1 | |
ctime: int | |
def is_alive(self) -> bool: | |
return self.cgroup.contains_pid(self.root_pid) | |
@staticmethod | |
def from_slot(slot: str) -> "JobInfo": | |
raw_slot_match = re.match(r"\d+_\d+", slot) | |
if raw_slot_match is None: | |
raise ValueError(rf"could not read SLOT {slot} as '\d+_\d+'") | |
raw_slot = raw_slot_match.group(0) | |
cgroupv1_path = list( | |
Path("/sys/fs/cgroup/cpu,cpuacct/htcondor").glob( | |
f"*slot{raw_slot}@*", | |
) | |
) | |
if not cgroupv1_path: | |
raise NotImplementedError("cgroupv2") | |
cgroupv1 = CGroupV1(cgroupv1_path[0]) | |
job_id, ctime = subprocess.check_output( | |
[ | |
"condor_status", | |
"-slot", | |
f"slot{raw_slot}@{socket.getfqdn()}", | |
"-af", | |
"GlobalJobID", | |
"JobStart", | |
], | |
stderr=subprocess.DEVNULL, | |
encoding="utf-8", | |
).split() | |
return JobInfo( | |
job_id=GlobalJobID(job_id), | |
root_pid=cgroupv1.get_leader_pid(), | |
cgroup=cgroupv1, | |
ctime=int(ctime), | |
) | |
def record( | |
interval: float, | |
job_info: JobInfo, | |
want_ps_tree: bool, | |
want_cgroup_usage: bool, | |
want_classad: bool, | |
): | |
for _ in every(interval): | |
print("####") | |
print("timestamp", time.time()) | |
print(time.strftime("%Y-%m-%dT%H:%M:%S")) | |
print("GlobalJobID", job_info.job_id) | |
print("RootPid", job_info.root_pid) | |
print("cgroup", "v1" if isinstance(job_info.cgroup, CGroupV1) else "v2") | |
print("####") | |
if want_ps_tree: | |
for line in get_pstree( | |
job_info.root_pid, ["user", "etime", "time", "pcpu"] | |
): | |
print(line.decode(), end="") | |
print("####") | |
if want_cgroup_usage: | |
usage = job_info.cgroup.get_cpu_usage() | |
print( | |
f"cgroup_usage={usage}", | |
f"cgroup_pcpu={usage / (time.time() - job_info.ctime):.2f}", | |
) | |
print("####") | |
if want_classad: | |
for line in get_classad(job_info.job_id): | |
print(line.decode(), end="") | |
print("####") | |
if not job_info.is_alive(): | |
return | |
if __name__ == "__main__": | |
args = CLI.parse_args() | |
job_info = JobInfo.from_slot(args.SLOT) | |
record(args.interval, job_info, args.ps_tree, args.cgroup_usage, args.classad) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment