Skip to content

Instantly share code, notes, and snippets.

@jDmacD
Last active July 14, 2017 16:16
Show Gist options
  • Save jDmacD/69ab6e592c97aee085bd79fe22e4c52e to your computer and use it in GitHub Desktop.
Save jDmacD/69ab6e592c97aee085bd79fe22e4c52e to your computer and use it in GitHub Desktop.
deletes jobs with X status over X days old
#!/usr/bin/python
# Below are required imports for the script to run
import os
import sys
import qb
import hashlib
import re
import time, datetime
import pprint
pp = pprint.PrettyPrinter(indent=4)
import elasticsearch
from elasticsearch import Elasticsearch,helpers
try:
es = Elasticsearch(['logger:9200'])
print "Connected", es.info()
except Exception as ex:
print "Error:", ex
jobs = qb.jobinfo(fields=[], filters={}, id=None, status='complete', agenda=False, subjobs=False, callbacks=False)
print("Total Complete jobs: ", len(jobs))
documents = []
count = 0
for job in jobs:
now = datetime.datetime.today()
then = datetime.datetime.fromtimestamp(job['timesubmit'])
elapsed = now-then
if elapsed.days < 2 :
shot_code_regex = r"(\d+-\d+)+"
shot_code = ''
matches = re.finditer(shot_code_regex, job['name'])
for matchNum, match in enumerate(matches):
matchNum = matchNum + 1
shot_code = match.group()
document = {
'_index': 'qube',
'_type':'job',
'_id':job['id'],
'_source':{
'id': job['id'],
'user': job['user'],
'name': job['name'],
'shot_code': shot_code,
'@timesubmit': int(round(job['timesubmit'] * 1000) ),
'@timestart': int(round(job['timestart'] * 1000 )),
'@timecomplete': int(round(job['timecomplete'] * 1000 )),
'job_duration': int(job['timecomplete'] - job['timestart']),
'wait_duration': int(job['timestart'] - job['timesubmit']),
'status': job['status'],
'subjobs_complete': job['todotally']['complete'],
'subjobs': []
}
}
time.sleep(1)
subjob_result = qb.jobinfo(fields=[], filters={}, id=job['id'], status='complete', agenda=False, subjobs=True, callbacks=False)
for subjob in subjob_result[0]['subjobs']:
es_index_id = hashlib.md5(str(job['id']) + str(subjob['id'])).hexdigest()[:12]
document['_source']['subjobs'].append(es_index_id)
documents.append({
'_index': 'qube',
'_type': 'work',
'_id': es_index_id,
'_source': {
'job_id': job['id'],
'work_count': subjob['count'],
'work_id': subjob['id'],
'work_host': subjob['host'],
'work_status': subjob['status'],
'@timesubmit': int(round(job['timesubmit'] * 1000) ),
'@timestart': int(round(subjob['timestart'] * 1000 )),
'@timecomplete': int(round(subjob['timecomplete'] * 1000 )),
'work_duration': int(subjob['timecomplete'] - subjob['timestart'])
}
})
print("job ID " + str(document['_id']) + " at count " + str(count) + " processed")
sys.stdout.flush()
documents.append(document)
count = count + 1
if count > 100:
print("committing to elasticsearch")
try:
result = helpers.bulk(es,documents)
print(result)
except elasticsearch.ElasticsearchException as ee:
print(ee)
documents = []
count = 0
sys.stdout.flush()
try:
result = helpers.bulk(es,documents)
print(result)
except elasticsearch.ElasticsearchException as ee:
print(ee)
sys.stdout.flush()
# usage example: python clean.py 10 killed 2
import datetime, time, sys, os
print os.environ['PATH']
print os.environ['PYTHONPATH']
import qb
days = int(sys.argv[1])
status = str(sys.argv[2])
sleep = int(sys.argv[3])
print('Deleting all jobs over ' + str(days) + ' days old with a status of ' + status + '. Will sleep for a period of ' + str(sleep) + ' between deletions')
sys.stdout.flush()
jobs = qb.jobinfo(status=status)
count = 0
for job in jobs:
now = datetime.datetime.today()
then = datetime.datetime.fromtimestamp(job['timesubmit'])
elapsed = now-then
if elapsed.days > days :
count = count + 1
print str(job['id']) + ' is ' + str(elapsed.days) + ' days old, deleting. Total Deleted: ' + str(count)
sys.stdout.flush()
qb.remove(job['id'])
time.sleep(sleep)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment