Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Last active July 4, 2018 08:26
Show Gist options
  • Save maxfischer2781/a94a1736effe613a362b783bde164d74 to your computer and use it in GitHub Desktop.
Save maxfischer2781/a94a1736effe613a362b783bde164d74 to your computer and use it in GitHub Desktop.
extract user resource request data from condor_history
import os
import chainlet
import subprocess
import hashlib
import sys
import collections
from chainlet.protolink import iterlet
SALT = os.environ.get('ANON_SALT', 'BATMAN&ROBIN').encode('utf-8')
fields = (
'Owner', 'AcctGroup', 'JobUniverse',
'ClusterId', 'ProcId',
'QDate', 'JobStartDate', 'CompletionDate',
'RequestCpus',
'RequestMemory', 'MemoryUsage',
'RequestDisk', 'DiskUsage_RAW',
'ExitCode', 'ExitBySignal', 'ExitSignal',
'RequestWalltime', 'RemoteWallClockTime', 'RemoteSysCpu', 'RemoteUserCpu',
'Cmd', 'TransferOutput', 'Iwd',
)
def anonymise(value):
anon_values = []
for piece in value.split('.'):
anon = hashlib.sha1()
anon.update(SALT[::2])
anon.update(piece.encode('utf-8'))
anon.update(SALT[1::2])
anon_values.append(anon.hexdigest()[:6])
return '.'.join(anon_values)
def clean_user(value: str, user_raw: str, user_placeholder: str):
return value.replace(user_raw, user_placeholder)
boolean = {
'true': True,
'false': False,
}
@chainlet.funclet
def map_fields(query_line):
values = query_line.strip().split(' ')
return dict(zip(fields, values))
@chainlet.funclet
def parse_values(raw_job_map: dict):
parsed_job_map = {}
for key in fields:
value = raw_job_map[key]
if value.lower() == 'undefined':
parsed_job_map[key] = None
elif key in ('Owner', 'AcctGroup'):
parsed_job_map[key] = anonymise(value)
elif key == 'ExitBySignal':
parsed_job_map[key] = True if value.lower() == 'true' else False
else:
try:
if key in {'RemoteWallClockTime', 'RemoteSysCpu', 'RemoteUserCpu'}:
parsed_job_map[key] = float(value)
elif key in {'Cmd', 'TransferOutput', 'Iwd'}:
parsed_job_map[key] = clean_user(value, raw_job_map['Owner'], parsed_job_map['Owner'])
else:
parsed_job_map[key] = int(value)
except ValueError:
parsed_job_map[key] = value
return parsed_job_map
@chainlet.genlet
def csv_writer(csv_handle, sep=' '):
job_map = yield
csv_handle.write(sep.join(fields) + '\n')
while True:
csv_handle.write(sep.join(repr(job_map[field]) for field in fields) + '\n')
csv_handle.flush()
job_map = yield job_map
query = subprocess.Popen(
['condor_history', '-constraint', '(ExitCode=!=Undefined) || (ExitSignal=!=Undefined)', '-af'] + list(fields),
stdout=subprocess.PIPE, universal_newlines=True
)
querylet = iterlet(query.stdout)
chain = querylet >> map_fields() >> parse_values() >> csv_writer(csv_handle=sys.stdout)
collections.deque(chain, maxlen=0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment