Last active
June 18, 2019 01:17
-
-
Save therealmitchconnors/4a4f2ec5e4afbdcc94bce32d9346b1ce to your computer and use it in GitHub Desktop.
Parse Prow data for a given PR to detect likely flakes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from google.cloud import storage # installed with `pip3 install google-cloud-storage` | |
import json | |
import re | |
# before running this file, you should be sure to have your gcloud | |
# credentials configured i.e. `gcloud auth application-default login` | |
# see https://cloud.google.com/compute/docs/tutorials/python-guide | |
prefix = 'pr-logs/pull/istio_istio/' | |
def ls_d(bucket, prefix): | |
result = [] | |
pages = bucket.list_blobs(prefix=prefix, delimiter='/').pages | |
for p in pages: | |
result.extend(p.prefixes) | |
return result | |
def get_blob_string(bucket, key): | |
blob = bucket.get_blob(key) | |
if blob == None: | |
return '' | |
return blob.download_as_string() | |
def leaf_name(fullname, delimiter='/'): | |
if fullname.endswith(delimiter): | |
fullname = fullname[:-1*len(delimiter)] | |
return fullname.split(delimiter)[-1] | |
def get_bucket(): | |
client = storage.Client() | |
return client.get_bucket('istio-prow') | |
def get_test_results(pr_path, b): | |
prnum = leaf_name(pr_path) | |
jobs = ls_d(b, pr_path) | |
jobmap = {} | |
for job in jobs: | |
jobname = leaf_name(job) | |
# if jobname != 'istio_auth_sds_e2e-master': | |
# continue | |
runmap = {} | |
runs = ls_d(b, job) | |
for run in runs: | |
runname = leaf_name(run) | |
print("Checking test " + jobname + ", run " + runname) | |
finishedstring = get_blob_string(b, run + 'finished.json') | |
if len(finishedstring) == 0: | |
runmap[runname] = {'finished': False} | |
else: | |
finished = json.loads(finishedstring) | |
clonestring = get_blob_string(b, run + 'clone-records.json') | |
clone = json.loads(clonestring) | |
runmap[runname] = {'finished': True, \ | |
'passed':finished['passed'], 'sha':clone[0]['refs']['pulls'][0]['sha'], \ | |
'base':clone[0]['refs']['base_sha'], 'clone-failure': clone[0]['failed']} | |
jobmap[jobname] = runmap | |
return prnum, jobmap | |
def eval_results(jobmap, prnum, b): | |
probable_flakes = [] | |
for job, runmap in jobmap.items(): | |
shamap = {} | |
for runname, run in runmap.items(): | |
if run['finished']: | |
if run['clone-failure']: | |
continue | |
sha = run['sha'] | |
run['run'] = runname | |
if sha in shamap: | |
prev = shamap[sha][0] | |
if prev['passed'] != run['passed']: | |
# this is a probable flake, let's check for environmental failures | |
if isEnvFail(prev, prnum, b, job) or isEnvFail(run, prnum, b, job): | |
print('ENVFAIL: Test ' + job + ' runs ' + prev['run'] + ' and ' + runname ) | |
continue | |
msg = 'Test ' + job + ' runs ' + prev['run'] + ' and ' + runname + ' flaked for commit ' + sha + \ | |
' with bases ' + prev['base'] + ' and ' + run['base'] | |
print(msg) | |
probable_flakes += msg | |
else: | |
shamap[sha] += run | |
else: | |
shamap[sha] = [run] | |
return probable_flakes | |
# Returns true if we are certain this is env failure, false if we aren't sure | |
def isEnvFail(run, prnum, b, job): | |
blob = b.get_blob(prefix + prnum + "/" + job + "/" + run["run"] + "/build-log.txt") | |
blob.download_to_filename("/tmp/foo") | |
h = re.compile("(error parsing HTTP 408 response body|failed to get a Boskos resource|recipe for target '.*docker.*' failed|Entrypoint received interrupt: terminated)") | |
with open("/tmp/foo") as infile: | |
for line in infile: | |
if h.search(line) != None: | |
return True | |
return False | |
import multiprocessing | |
# Calling this outside a Pool initializer is dangerous. Use get_bucket instead. | |
def init_global_bucket(bucket_name): | |
client = storage.Client() | |
global bucket | |
bucket = client.get_bucket(bucket_name) | |
# This function calls get_test_results using the global bucket var for multiprocessing | |
def get_test_results_global(pr_path): | |
return get_test_results(pr_path, bucket) | |
def main(): | |
import sys | |
prstart = sys.argv[1] | |
prend = None | |
if len(sys.argv) > 2: | |
prend = sys.argv[2] | |
else: | |
prend = prstart | |
try: | |
with multiprocessing.Pool(processes=32, initializer=init_global_bucket, initargs=['istio-prow']) as pool: | |
results = pool.map(get_test_results_global, [prefix + str(prnum) + '/' for prnum in range(int(prstart), int(prend))]) | |
except KeyboardInterrupt: | |
print("terminating gcs queries, skipping to flake analysis") | |
b = get_bucket() | |
for prnum, result in results: | |
if result == None: | |
continue | |
print("***Flakes for PR " + prnum + "***") | |
eval_results(result, prnum, b) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
With the new multiprocessing changes, we can process a single PR in about 7 seconds. This is still pretty slow, and doesn't include env detection, but it's much better than the original one per three minutes. Also, this speed increase requires using a service account...