Skip to content

Instantly share code, notes, and snippets.

@zhangyuan
Last active March 15, 2017 04:13
Show Gist options
  • Select an option

  • Save zhangyuan/fb76dfb8331573535685d5860a2493f0 to your computer and use it in GitHub Desktop.

Select an option

Save zhangyuan/fb76dfb8331573535685d5860a2493f0 to your computer and use it in GitHub Desktop.
# requirements.txt
#
# pydash
# requests
# termcolor
import requests
import pydash
import time
import os
import atexit
from datetime import datetime
from requests.auth import HTTPBasicAuth
from termcolor import colored
from random import randint
USERNAME = os.environ.get("GOCD_USERNAME")
PASSWORD = os.environ.get("GOCD_PASSWORD")
PIPELINE_NAME = os.environ.get("PIPELINE_NAME")
STAGE_NAME = os.environ.get("STAGE_NAME")
JOB_NAME = os.environ.get("JOB_NAME")
GOCD_URL= os.environ.get("GOCD_URL")
class Worker(object):
def __init__(self, pipeline_name, stage_name, job_name):
self.pipeline_name = pipeline_name
self.stage_name = stage_name
self.job_name = job_name
self.output = []
def _get_max_pipeline_counter(self):
url = self.get_pipeline_hisotry_url()
obj = self.send_request(url)
return obj.get("pipelines")[0].get("counter")
def send_request(self, url):
return requests.get(url, auth=HTTPBasicAuth(USERNAME, PASSWORD)).json()
def analyse(self):
max_pipeline_counter = self._get_max_pipeline_counter()
pipeline_counter = 1
status = None
while True:
stage_counter = 1
while True:
status = self.get_status(pipeline_counter, stage_counter)
if status is None:
break
self.output.append(status)
stage_counter +=1
if status is None:
if stage_counter == 1 and pipeline_counter > max_pipeline_counter:
break
else:
pipeline_counter += 1
stage_counter = 1
return self.output
def get_pipeline_hisotry_url(self):
return "%s/go/api/pipelines/%s/history" % (GOCD_URL, self.pipeline_name)
def get_url(self, pipeline_counter, stage_counter):
pipeline_name = self.pipeline_name
stage_name = self.stage_name
return "%s/go/api/stages/%s/%s/instance/%d/%d" % (GOCD_URL, pipeline_name, stage_name, pipeline_counter, stage_counter)
def get_status(self, pipeline_counter, stage_counter):
time.sleep(randint(1,5))
url = self.get_url(pipeline_counter, stage_counter)
obj = self.send_request(url)
if obj.get("id") > 0:
jobs = obj.get("jobs")
job = pydash.find(jobs, lambda j: j.get("name") == self.job_name)
transitions = job.get("job_state_transitions")
start_time = transitions[0].get("state_change_time")
end_time = transitions[-1].get("state_change_time")
result = job.get("result")
return JobStatus(datetime.fromtimestamp(start_time/1000), datetime.fromtimestamp(end_time/1000), result)
else:
return None
class JobStatus(object):
def __init__(self, start_time, end_time, result):
self.start_time = start_time
self.end_time = end_time
self.result = result
def duration(self):
return (self.end_time - self.start_time)
def __repr__(self):
if self.result == "Passed":
colored_result = colored(self.result, 'green')
import requests
import pydash
import time
import os
import atexit
from datetime import datetime
from requests.auth import HTTPBasicAuth
from termcolor import colored
from random import randint
USERNAME = os.environ.get("GOCD_USERNAME")
PASSWORD = os.environ.get("GOCD_PASSWORD")
PIPELINE_NAME = os.environ.get("PIPELINE_NAME")
STAGE_NAME = os.environ.get("STAGE_NAME")
JOB_NAME = os.environ.get("JOB_NAME")
GOCD_URL= os.environ.get("GOCD_URL")
class Worker(object):
def __init__(self, pipeline_name, stage_name, job_name):
self.pipeline_name = pipeline_name
self.stage_name = stage_name
self.job_name = job_name
self.output = []
def _get_max_pipeline_counter(self):
url = self.get_pipeline_hisotry_url()
obj = self.send_request(url)
return obj.get("pipelines")[0].get("counter")
def send_request(self, url):
return requests.get(url, auth=HTTPBasicAuth(USERNAME, PASSWORD)).json()
def analyse(self, pipeline_counter=1):
max_pipeline_counter = self._get_max_pipeline_counter()
status = None
while True:
stage_counter = 1
while True:
status = self.get_status(pipeline_counter, stage_counter)
if status is None:
break
self.output.append(status)
stage_counter +=1
if status is None:
if stage_counter == 1 and pipeline_counter > max_pipeline_counter:
break
else:
pipeline_counter += 1
stage_counter = 1
return self.output
def get_pipeline_hisotry_url(self):
return "%s/go/api/pipelines/%s/history" % (GOCD_URL, self.pipeline_name)
def get_url(self, pipeline_counter, stage_counter):
pipeline_name = self.pipeline_name
stage_name = self.stage_name
return "%s/go/api/stages/%s/%s/instance/%d/%d" % (GOCD_URL, pipeline_name, stage_name, pipeline_counter, stage_counter)
def get_status(self, pipeline_counter, stage_counter):
time.sleep(randint(5,10))
url = self.get_url(pipeline_counter, stage_counter)
obj = self.send_request(url)
if obj.get("id") > 0:
jobs = obj.get("jobs")
job = pydash.find(jobs, lambda j: j.get("name") == self.job_name)
transitions = job.get("job_state_transitions")
start_time = transitions[0].get("state_change_time")
end_time = transitions[-1].get("state_change_time")
result = job.get("result")
status = JobStatus(datetime.fromtimestamp(start_time/1000), datetime.fromtimestamp(end_time/1000), result)
status.pipeline_counter = pipeline_counter
status.stage_counter = stage_counter
return status
else:
return None
class JobStatus(object):
def __init__(self, start_time, end_time, result):
self.start_time = start_time
self.end_time = end_time
self.result = result
self.pipeline_counter = None
self.stage_counter = None
def duration(self):
return (self.end_time - self.start_time)
def __repr__(self):
if self.result == "Passed":
colored_result = colored(self.result, 'green')
elif self.result == "Cancelled":
colored_result = colored(self.result, 'yellow')
elif self.result == "Failed":
colored_result = colored(self.result, "red")
else:
colored_result = self.result
return "%s\t%s\t%s\t%s" % (self.start_time, self.end_time, self.duration(), colored_result)
class JobStatusReporter(object):
def pretty_print(self, status):
if status.result == "Passed":
colored_result = colored(status.result, 'green')
elif status.result == "Cancelled":
colored_result = colored(status.result, 'yellow')
elif status.result == "Failed":
colored_result = colored(status.result, "red")
else:
colored_result = status.result
return "%d\t%d\t%s\t%s\t%s\t%s" % (status.pipeline_counter, status.stage_counter, status.start_time, status.end_time, status.duration(), colored_result)
def exit_handler(worker):
print "----- REPORT BEGIN -----"
reporter = JobStatusReporter()
for status in worker.output:
print reporter.pretty_print(status)
print "----- REPORT END -----"
if __name__ == "__main__":
worker = Worker(PIPELINE_NAME, STAGE_NAME, JOB_NAME)
atexit.register(exit_handler, worker)
worker.analyse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment