Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Created February 16, 2021 17:08
Show Gist options
  • Save maxfischer2781/103f1c679a27f6af7ecea7eea72a37d4 to your computer and use it in GitHub Desktop.
Save maxfischer2781/103f1c679a27f6af7ecea7eea72a37d4 to your computer and use it in GitHub Desktop.
Script to recreate missing HTCondor-CE APEL blah/batch records from a condor history.d directory
"""
Script to recreate missing APEL blah/batch records from a condor history.d directory
"""
from typing import TextIO, TypeVar, List
import argparse
import pathlib
import subprocess
import os
import threading
import tempfile
import functools
from concurrent.futures import ThreadPoolExecutor
T = TypeVar("T")
os.environ["TZ"] = "GMT"
CLI = argparse.ArgumentParser()
CLI.add_argument(
"--min-job",
type=int,
help="Minimum LRMS Cluster ID to check",
default=1,
)
CLI.add_argument(
"MAX_JOB",
type=int,
help="Maximum LRMS Cluster ID to check",
)
CLI.add_argument(
"--blah-files",
nargs='+',
help="blah records to compare",
type=pathlib.Path,
)
CLI.add_argument(
"--batch-files",
nargs='+',
help="batch records to compare",
type=pathlib.Path,
)
CLI.add_argument(
"--query-threads",
type=int,
default=10,
help="Maximum threads for creating queries"
)
pool = None # type: ThreadPoolExecutor
write_lock = threading.Lock()
def read_blah_ids(record: pathlib.Path) -> set:
return {
int(line.partition("lrmsID=")[-1].partition(".")[0])
for line in record.open()
}
def read_batch_ids(record: pathlib.Path) -> set:
return {int(line.partition(".")[0]) for line in record.open()}
def get_history(job_ids: set):
history_d = pathlib.Path(*read_config_vals("PER_JOB_HISTORY_DIR", ce=False))
for cluster_id in sorted(job_ids):
job_history = history_d / f"history.{cluster_id}.0"
if job_history.exists():
yield job_history
def read_config_vals(*keys: str, ce=True) -> list:
return [
subprocess.check_output(
["condor_ce_config_val" if ce else "condor_config_val", str(key)],
universal_newlines=True
).strip() for key in keys
]
def fmt_history(fmt: List[str], history: pathlib.Path, out_stream: TextIO):
subprocess.check_call(
["condor_q", "-jobads", str(history), *fmt],
stdout=out_stream,
)
def fmt_histories(histories: List[pathlib.Path], fmt: List[str], out_stream: TextIO):
with tempfile.TemporaryFile(mode="w+") as tmp_stream: # type: TextIO
for history in histories:
fmt_history(fmt, history, tmp_stream)
tmp_stream.seek(0)
with write_lock:
out_stream.write(tmp_stream.read())
def create_blah(histories: List[pathlib.Path], out_stream: TextIO):
ce_host, batch_host, ce_id = read_config_vals("APEL_CE_HOST", "APEL_BATCH_HOST", "APEL_CE_ID")
q_args = [
'-constraint', 'RemoteWallClockTime != 0 && RoutedFromJobId =!= undefined',
'-format', '"timestamp=%s" ', 'formatTime(EnteredCurrentStatus, "%Y-%m-%d %H:%M:%S")',
'-format', '"userDN=%s" ', 'x509userproxysubject',
'-format', '"userFQAN=%s" ', 'x509UserProxyFirstFQAN',
'-format', f'"ceID={ce_id}" ', 'EMPTY',
'-format', f'"jobID=%v_{ce_host}" ', 'RoutedFromJobId',
'-format', '"lrmsID=%v', 'clusterId',
'-format', f'.%v_{batch_host}" ', 'ProcId',
'-format', '"localUser=%s"\n', 'Owner',
]
for chunk, _ in enumerate(pool.map(
functools.partial(fmt_histories, fmt=q_args, out_stream=out_stream),
[histories[i:i+100] for i in range(0, len(histories), 100)]
)):
print(chunk * 100, end="\r")
def create_batch(histories: List[pathlib.Path], out_stream: TextIO):
scaling_attr, batch_host = read_config_vals("APEL_SCALING_ATTR", "APEL_BATCH_HOST")
q_args = [
'-constraint', 'RemoteWallClockTime != 0 && RoutedFromJobId =!= undefined',
'-format', "%s", 'clusterId',
'-format', f".%s_{batch_host}|", 'ProcId',
'-format', "%s|", 'Owner',
'-format', "%d|", 'RemoteWallClockTime',
'-format', "%d|", 'RemoteUserCpu',
'-format', "%d|", 'RemoteSysCpu',
'-format', "%d|", 'JobStartDate',
'-format', "%d|", 'EnteredCurrentStatus',
'-format', "%d|", 'ResidentSetSize_RAW',
'-format', "%d|", 'ImageSize_RAW',
'-format', "%d|", 'RequestCpus',
'-format', "%v|\n", f"{scaling_attr}",
]
for chunk, _ in enumerate(pool.map(
functools.partial(fmt_histories, fmt=q_args, out_stream=out_stream),
[histories[i:i+100] for i in range(0, len(histories), 100)],
)):
print(chunk * 100, end="\r")
def main():
global pool
ce_host, = read_config_vals("APEL_CE_HOST")
options = CLI.parse_args()
all_jobs = {*range(options.min_job, options.MAX_JOB + 1)}
missing_blah_ids = all_jobs.difference(
*(read_blah_ids(blah_file) for blah_file in options.blah_files)
)
missing_batch_ids = all_jobs.difference(
*(read_batch_ids(batch_file) for batch_file in options.batch_files)
)
print("Missing", len(missing_blah_ids), "blah ids")
print("Missing", len(missing_batch_ids), "batch ids")
backup_blah_histories = list(get_history(missing_blah_ids))
backup_batch_histories = list(get_history(missing_batch_ids))
print("Recorded missing", len(backup_blah_histories), "blah ids")
print("Recorded missing", len(backup_batch_histories), "batch ids")
with ThreadPoolExecutor(options.query_threads) as pool:
description = f"fixup-{options.min_job}-{options.MAX_JOB}-{ce_host}"
with open(f"blah-{description}", "w") as out_stream:
create_blah(backup_blah_histories, out_stream)
print()
with open(f"batch-{description}", "w") as out_stream:
create_batch(backup_blah_histories, out_stream)
print()
if __name__ == "__main__":
main()
@maxfischer2781
Copy link
Author

Usage example: python3 apel_fixup.py 3500000 --min-job 3000001 --blah-files /var/lib/condor-ce/apel/blah-202* --batch-files /var/lib/condor-ce/apel/batch-202*

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment