Last active
July 4, 2018 08:26
-
-
Save maxfischer2781/a94a1736effe613a362b783bde164d74 to your computer and use it in GitHub Desktop.
extract user resource request data from condor_history
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import chainlet | |
import subprocess | |
import hashlib | |
import sys | |
import collections | |
from chainlet.protolink import iterlet | |
SALT = os.environ.get('ANON_SALT', 'BATMAN&ROBIN').encode('utf-8') | |
fields = ( | |
'Owner', 'AcctGroup', 'JobUniverse', | |
'ClusterId', 'ProcId', | |
'QDate', 'JobStartDate', 'CompletionDate', | |
'RequestCpus', | |
'RequestMemory', 'MemoryUsage', | |
'RequestDisk', 'DiskUsage_RAW', | |
'ExitCode', 'ExitBySignal', 'ExitSignal', | |
'RequestWalltime', 'RemoteWallClockTime', 'RemoteSysCpu', 'RemoteUserCpu', | |
'Cmd', 'TransferOutput', 'Iwd', | |
) | |
def anonymise(value): | |
anon_values = [] | |
for piece in value.split('.'): | |
anon = hashlib.sha1() | |
anon.update(SALT[::2]) | |
anon.update(piece.encode('utf-8')) | |
anon.update(SALT[1::2]) | |
anon_values.append(anon.hexdigest()[:6]) | |
return '.'.join(anon_values) | |
def clean_user(value: str, user_raw: str, user_placeholder: str): | |
return value.replace(user_raw, user_placeholder) | |
boolean = { | |
'true': True, | |
'false': False, | |
} | |
@chainlet.funclet | |
def map_fields(query_line): | |
values = query_line.strip().split(' ') | |
return dict(zip(fields, values)) | |
@chainlet.funclet | |
def parse_values(raw_job_map: dict): | |
parsed_job_map = {} | |
for key in fields: | |
value = raw_job_map[key] | |
if value.lower() == 'undefined': | |
parsed_job_map[key] = None | |
elif key in ('Owner', 'AcctGroup'): | |
parsed_job_map[key] = anonymise(value) | |
elif key == 'ExitBySignal': | |
parsed_job_map[key] = True if value.lower() == 'true' else False | |
else: | |
try: | |
if key in {'RemoteWallClockTime', 'RemoteSysCpu', 'RemoteUserCpu'}: | |
parsed_job_map[key] = float(value) | |
elif key in {'Cmd', 'TransferOutput', 'Iwd'}: | |
parsed_job_map[key] = clean_user(value, raw_job_map['Owner'], parsed_job_map['Owner']) | |
else: | |
parsed_job_map[key] = int(value) | |
except ValueError: | |
parsed_job_map[key] = value | |
return parsed_job_map | |
@chainlet.genlet | |
def csv_writer(csv_handle, sep=' '): | |
job_map = yield | |
csv_handle.write(sep.join(fields) + '\n') | |
while True: | |
csv_handle.write(sep.join(repr(job_map[field]) for field in fields) + '\n') | |
csv_handle.flush() | |
job_map = yield job_map | |
query = subprocess.Popen( | |
['condor_history', '-constraint', '(ExitCode=!=Undefined) || (ExitSignal=!=Undefined)', '-af'] + list(fields), | |
stdout=subprocess.PIPE, universal_newlines=True | |
) | |
querylet = iterlet(query.stdout) | |
chain = querylet >> map_fields() >> parse_values() >> csv_writer(csv_handle=sys.stdout) | |
collections.deque(chain, maxlen=0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment