Last active
March 15, 2017 04:13
-
-
Save zhangyuan/fb76dfb8331573535685d5860a2493f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # requirements.txt | |
| # | |
| # pydash | |
| # requests | |
| # termcolor | |
| import requests | |
| import pydash | |
| import time | |
| import os | |
| import atexit | |
| from datetime import datetime | |
| from requests.auth import HTTPBasicAuth | |
| from termcolor import colored | |
| from random import randint | |
| USERNAME = os.environ.get("GOCD_USERNAME") | |
| PASSWORD = os.environ.get("GOCD_PASSWORD") | |
| PIPELINE_NAME = os.environ.get("PIPELINE_NAME") | |
| STAGE_NAME = os.environ.get("STAGE_NAME") | |
| JOB_NAME = os.environ.get("JOB_NAME") | |
| GOCD_URL= os.environ.get("GOCD_URL") | |
| class Worker(object): | |
| def __init__(self, pipeline_name, stage_name, job_name): | |
| self.pipeline_name = pipeline_name | |
| self.stage_name = stage_name | |
| self.job_name = job_name | |
| self.output = [] | |
| def _get_max_pipeline_counter(self): | |
| url = self.get_pipeline_hisotry_url() | |
| obj = self.send_request(url) | |
| return obj.get("pipelines")[0].get("counter") | |
| def send_request(self, url): | |
| return requests.get(url, auth=HTTPBasicAuth(USERNAME, PASSWORD)).json() | |
| def analyse(self): | |
| max_pipeline_counter = self._get_max_pipeline_counter() | |
| pipeline_counter = 1 | |
| status = None | |
| while True: | |
| stage_counter = 1 | |
| while True: | |
| status = self.get_status(pipeline_counter, stage_counter) | |
| if status is None: | |
| break | |
| self.output.append(status) | |
| stage_counter +=1 | |
| if status is None: | |
| if stage_counter == 1 and pipeline_counter > max_pipeline_counter: | |
| break | |
| else: | |
| pipeline_counter += 1 | |
| stage_counter = 1 | |
| return self.output | |
| def get_pipeline_hisotry_url(self): | |
| return "%s/go/api/pipelines/%s/history" % (GOCD_URL, self.pipeline_name) | |
| def get_url(self, pipeline_counter, stage_counter): | |
| pipeline_name = self.pipeline_name | |
| stage_name = self.stage_name | |
| return "%s/go/api/stages/%s/%s/instance/%d/%d" % (GOCD_URL, pipeline_name, stage_name, pipeline_counter, stage_counter) | |
| def get_status(self, pipeline_counter, stage_counter): | |
| time.sleep(randint(1,5)) | |
| url = self.get_url(pipeline_counter, stage_counter) | |
| obj = self.send_request(url) | |
| if obj.get("id") > 0: | |
| jobs = obj.get("jobs") | |
| job = pydash.find(jobs, lambda j: j.get("name") == self.job_name) | |
| transitions = job.get("job_state_transitions") | |
| start_time = transitions[0].get("state_change_time") | |
| end_time = transitions[-1].get("state_change_time") | |
| result = job.get("result") | |
| return JobStatus(datetime.fromtimestamp(start_time/1000), datetime.fromtimestamp(end_time/1000), result) | |
| else: | |
| return None | |
| class JobStatus(object): | |
| def __init__(self, start_time, end_time, result): | |
| self.start_time = start_time | |
| self.end_time = end_time | |
| self.result = result | |
| def duration(self): | |
| return (self.end_time - self.start_time) | |
| def __repr__(self): | |
| if self.result == "Passed": | |
| colored_result = colored(self.result, 'green') | |
| import requests | |
| import pydash | |
| import time | |
| import os | |
| import atexit | |
| from datetime import datetime | |
| from requests.auth import HTTPBasicAuth | |
| from termcolor import colored | |
| from random import randint | |
| USERNAME = os.environ.get("GOCD_USERNAME") | |
| PASSWORD = os.environ.get("GOCD_PASSWORD") | |
| PIPELINE_NAME = os.environ.get("PIPELINE_NAME") | |
| STAGE_NAME = os.environ.get("STAGE_NAME") | |
| JOB_NAME = os.environ.get("JOB_NAME") | |
| GOCD_URL= os.environ.get("GOCD_URL") | |
| class Worker(object): | |
| def __init__(self, pipeline_name, stage_name, job_name): | |
| self.pipeline_name = pipeline_name | |
| self.stage_name = stage_name | |
| self.job_name = job_name | |
| self.output = [] | |
| def _get_max_pipeline_counter(self): | |
| url = self.get_pipeline_hisotry_url() | |
| obj = self.send_request(url) | |
| return obj.get("pipelines")[0].get("counter") | |
| def send_request(self, url): | |
| return requests.get(url, auth=HTTPBasicAuth(USERNAME, PASSWORD)).json() | |
| def analyse(self, pipeline_counter=1): | |
| max_pipeline_counter = self._get_max_pipeline_counter() | |
| status = None | |
| while True: | |
| stage_counter = 1 | |
| while True: | |
| status = self.get_status(pipeline_counter, stage_counter) | |
| if status is None: | |
| break | |
| self.output.append(status) | |
| stage_counter +=1 | |
| if status is None: | |
| if stage_counter == 1 and pipeline_counter > max_pipeline_counter: | |
| break | |
| else: | |
| pipeline_counter += 1 | |
| stage_counter = 1 | |
| return self.output | |
| def get_pipeline_hisotry_url(self): | |
| return "%s/go/api/pipelines/%s/history" % (GOCD_URL, self.pipeline_name) | |
| def get_url(self, pipeline_counter, stage_counter): | |
| pipeline_name = self.pipeline_name | |
| stage_name = self.stage_name | |
| return "%s/go/api/stages/%s/%s/instance/%d/%d" % (GOCD_URL, pipeline_name, stage_name, pipeline_counter, stage_counter) | |
| def get_status(self, pipeline_counter, stage_counter): | |
| time.sleep(randint(5,10)) | |
| url = self.get_url(pipeline_counter, stage_counter) | |
| obj = self.send_request(url) | |
| if obj.get("id") > 0: | |
| jobs = obj.get("jobs") | |
| job = pydash.find(jobs, lambda j: j.get("name") == self.job_name) | |
| transitions = job.get("job_state_transitions") | |
| start_time = transitions[0].get("state_change_time") | |
| end_time = transitions[-1].get("state_change_time") | |
| result = job.get("result") | |
| status = JobStatus(datetime.fromtimestamp(start_time/1000), datetime.fromtimestamp(end_time/1000), result) | |
| status.pipeline_counter = pipeline_counter | |
| status.stage_counter = stage_counter | |
| return status | |
| else: | |
| return None | |
| class JobStatus(object): | |
| def __init__(self, start_time, end_time, result): | |
| self.start_time = start_time | |
| self.end_time = end_time | |
| self.result = result | |
| self.pipeline_counter = None | |
| self.stage_counter = None | |
| def duration(self): | |
| return (self.end_time - self.start_time) | |
| def __repr__(self): | |
| if self.result == "Passed": | |
| colored_result = colored(self.result, 'green') | |
| elif self.result == "Cancelled": | |
| colored_result = colored(self.result, 'yellow') | |
| elif self.result == "Failed": | |
| colored_result = colored(self.result, "red") | |
| else: | |
| colored_result = self.result | |
| return "%s\t%s\t%s\t%s" % (self.start_time, self.end_time, self.duration(), colored_result) | |
| class JobStatusReporter(object): | |
| def pretty_print(self, status): | |
| if status.result == "Passed": | |
| colored_result = colored(status.result, 'green') | |
| elif status.result == "Cancelled": | |
| colored_result = colored(status.result, 'yellow') | |
| elif status.result == "Failed": | |
| colored_result = colored(status.result, "red") | |
| else: | |
| colored_result = status.result | |
| return "%d\t%d\t%s\t%s\t%s\t%s" % (status.pipeline_counter, status.stage_counter, status.start_time, status.end_time, status.duration(), colored_result) | |
| def exit_handler(worker): | |
| print "----- REPORT BEGIN -----" | |
| reporter = JobStatusReporter() | |
| for status in worker.output: | |
| print reporter.pretty_print(status) | |
| print "----- REPORT END -----" | |
| if __name__ == "__main__": | |
| worker = Worker(PIPELINE_NAME, STAGE_NAME, JOB_NAME) | |
| atexit.register(exit_handler, worker) | |
| worker.analyse() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment