Created
February 16, 2021 17:08
-
-
Save maxfischer2781/103f1c679a27f6af7ecea7eea72a37d4 to your computer and use it in GitHub Desktop.
Script to recreate missing HTCondor-CE APEL blah/batch records from a condor history.d directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script to recreate missing APEL blah/batch records from a condor history.d directory | |
""" | |
from typing import TextIO, TypeVar, List | |
import argparse | |
import pathlib | |
import subprocess | |
import os | |
import threading | |
import tempfile | |
import functools | |
from concurrent.futures import ThreadPoolExecutor | |
T = TypeVar("T") | |
os.environ["TZ"] = "GMT" | |
CLI = argparse.ArgumentParser() | |
CLI.add_argument( | |
"--min-job", | |
type=int, | |
help="Minimum LRMS Cluster ID to check", | |
default=1, | |
) | |
CLI.add_argument( | |
"MAX_JOB", | |
type=int, | |
help="Maximum LRMS Cluster ID to check", | |
) | |
CLI.add_argument( | |
"--blah-files", | |
nargs='+', | |
help="blah records to compare", | |
type=pathlib.Path, | |
) | |
CLI.add_argument( | |
"--batch-files", | |
nargs='+', | |
help="batch records to compare", | |
type=pathlib.Path, | |
) | |
CLI.add_argument( | |
"--query-threads", | |
type=int, | |
default=10, | |
help="Maximum threads for creating queries" | |
) | |
pool = None # type: ThreadPoolExecutor | |
write_lock = threading.Lock() | |
def read_blah_ids(record: pathlib.Path) -> set: | |
return { | |
int(line.partition("lrmsID=")[-1].partition(".")[0]) | |
for line in record.open() | |
} | |
def read_batch_ids(record: pathlib.Path) -> set: | |
return {int(line.partition(".")[0]) for line in record.open()} | |
def get_history(job_ids: set): | |
history_d = pathlib.Path(*read_config_vals("PER_JOB_HISTORY_DIR", ce=False)) | |
for cluster_id in sorted(job_ids): | |
job_history = history_d / f"history.{cluster_id}.0" | |
if job_history.exists(): | |
yield job_history | |
def read_config_vals(*keys: str, ce=True) -> list: | |
return [ | |
subprocess.check_output( | |
["condor_ce_config_val" if ce else "condor_config_val", str(key)], | |
universal_newlines=True | |
).strip() for key in keys | |
] | |
def fmt_history(fmt: List[str], history: pathlib.Path, out_stream: TextIO): | |
subprocess.check_call( | |
["condor_q", "-jobads", str(history), *fmt], | |
stdout=out_stream, | |
) | |
def fmt_histories(histories: List[pathlib.Path], fmt: List[str], out_stream: TextIO): | |
with tempfile.TemporaryFile(mode="w+") as tmp_stream: # type: TextIO | |
for history in histories: | |
fmt_history(fmt, history, tmp_stream) | |
tmp_stream.seek(0) | |
with write_lock: | |
out_stream.write(tmp_stream.read()) | |
def create_blah(histories: List[pathlib.Path], out_stream: TextIO): | |
ce_host, batch_host, ce_id = read_config_vals("APEL_CE_HOST", "APEL_BATCH_HOST", "APEL_CE_ID") | |
q_args = [ | |
'-constraint', 'RemoteWallClockTime != 0 && RoutedFromJobId =!= undefined', | |
'-format', '"timestamp=%s" ', 'formatTime(EnteredCurrentStatus, "%Y-%m-%d %H:%M:%S")', | |
'-format', '"userDN=%s" ', 'x509userproxysubject', | |
'-format', '"userFQAN=%s" ', 'x509UserProxyFirstFQAN', | |
'-format', f'"ceID={ce_id}" ', 'EMPTY', | |
'-format', f'"jobID=%v_{ce_host}" ', 'RoutedFromJobId', | |
'-format', '"lrmsID=%v', 'clusterId', | |
'-format', f'.%v_{batch_host}" ', 'ProcId', | |
'-format', '"localUser=%s"\n', 'Owner', | |
] | |
for chunk, _ in enumerate(pool.map( | |
functools.partial(fmt_histories, fmt=q_args, out_stream=out_stream), | |
[histories[i:i+100] for i in range(0, len(histories), 100)] | |
)): | |
print(chunk * 100, end="\r") | |
def create_batch(histories: List[pathlib.Path], out_stream: TextIO): | |
scaling_attr, batch_host = read_config_vals("APEL_SCALING_ATTR", "APEL_BATCH_HOST") | |
q_args = [ | |
'-constraint', 'RemoteWallClockTime != 0 && RoutedFromJobId =!= undefined', | |
'-format', "%s", 'clusterId', | |
'-format', f".%s_{batch_host}|", 'ProcId', | |
'-format', "%s|", 'Owner', | |
'-format', "%d|", 'RemoteWallClockTime', | |
'-format', "%d|", 'RemoteUserCpu', | |
'-format', "%d|", 'RemoteSysCpu', | |
'-format', "%d|", 'JobStartDate', | |
'-format', "%d|", 'EnteredCurrentStatus', | |
'-format', "%d|", 'ResidentSetSize_RAW', | |
'-format', "%d|", 'ImageSize_RAW', | |
'-format', "%d|", 'RequestCpus', | |
'-format', "%v|\n", f"{scaling_attr}", | |
] | |
for chunk, _ in enumerate(pool.map( | |
functools.partial(fmt_histories, fmt=q_args, out_stream=out_stream), | |
[histories[i:i+100] for i in range(0, len(histories), 100)], | |
)): | |
print(chunk * 100, end="\r") | |
def main(): | |
global pool | |
ce_host, = read_config_vals("APEL_CE_HOST") | |
options = CLI.parse_args() | |
all_jobs = {*range(options.min_job, options.MAX_JOB + 1)} | |
missing_blah_ids = all_jobs.difference( | |
*(read_blah_ids(blah_file) for blah_file in options.blah_files) | |
) | |
missing_batch_ids = all_jobs.difference( | |
*(read_batch_ids(batch_file) for batch_file in options.batch_files) | |
) | |
print("Missing", len(missing_blah_ids), "blah ids") | |
print("Missing", len(missing_batch_ids), "batch ids") | |
backup_blah_histories = list(get_history(missing_blah_ids)) | |
backup_batch_histories = list(get_history(missing_batch_ids)) | |
print("Recorded missing", len(backup_blah_histories), "blah ids") | |
print("Recorded missing", len(backup_batch_histories), "batch ids") | |
with ThreadPoolExecutor(options.query_threads) as pool: | |
description = f"fixup-{options.min_job}-{options.MAX_JOB}-{ce_host}" | |
with open(f"blah-{description}", "w") as out_stream: | |
create_blah(backup_blah_histories, out_stream) | |
print() | |
with open(f"batch-{description}", "w") as out_stream: | |
create_batch(backup_blah_histories, out_stream) | |
print() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage example:
python3 apel_fixup.py 3500000 --min-job 3000001 --blah-files /var/lib/condor-ce/apel/blah-202* --batch-files /var/lib/condor-ce/apel/batch-202*