Created
May 2, 2025 21:15
-
-
Save oneamtu/bdc4ddf6eea497e398a648ba9cc68e37 to your computer and use it in GitHub Desktop.
CircleCI check_ci_jobs.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from datetime import datetime, timedelta | |
def hours_since_timestamp_s(timestamp_s): | |
return datetime.utcnow() - datetime.fromisoformat(timestamp_s) | |
def get_all_parallel_jobs(org_slug, api_token, stats_hours, cancel, cancel_hours): | |
headers = { | |
'Circle-Token': api_token, | |
'Content-Type': 'application/json' | |
} | |
# Get recent jobs (last 30 days) | |
end_time = datetime.now().isoformat() | |
start_time = (datetime.now() - timedelta(days=30)).isoformat() | |
url = f'https://circleci.com/api/v2/pipeline' | |
params = { | |
'org-slug': org_slug, | |
'page-token': None, | |
# 'mine': 'False' | |
} | |
collected_workflows = [] | |
cancelled_workflows = [] | |
collected_jobs = [] | |
done = False | |
while not done: | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code != 200: | |
raise BaseException(response.text) | |
else: | |
pipelines = response.json()['items'] | |
for pipeline in pipelines: | |
# print(pipeline['created_at']) | |
# import pdb; pdb.set_trace() | |
if hours_since_timestamp_s(pipeline['created_at'][:-1]) > timedelta(hours=stats_hours): | |
done = True | |
break | |
# print(f"Pipeline: {pipeline}") | |
workflow_url = f"https://circleci.com/api/v2/pipeline/{pipeline['id']}/workflow" | |
workflow_response = requests.get(workflow_url, headers=headers) | |
if workflow_response.status_code != 200: | |
raise BaseException(workflow_response.text) | |
else: | |
workflows = workflow_response.json()['items'] | |
collected_workflows.extend(workflows) | |
for workflow in workflows: | |
# print(f"Workflow: {workflow}") | |
if cancel and (workflow['status'] in ['running']) and \ | |
(hours_since_timestamp_s(workflow['created_at'][:-1]) > timedelta(hours=cancel_hours)): | |
cancel_url = f"https://circleci.com/api/v2/workflow/{workflow['id']}/cancel" | |
cancel_response = requests.post(cancel_url, headers=headers) | |
if cancel_response.status_code != 200: | |
raise BaseException(cancel_response.text) | |
else: | |
cancelled_workflows.append(workflow) | |
jobs_url = f"https://circleci.com/api/v2/workflow/{workflow['id']}/job" | |
jobs_response = requests.get(jobs_url, headers=headers) | |
if jobs_response.status_code != 200: | |
raise BaseException(jobs_response.text) | |
else: | |
jobs = jobs_response.json()['items'] | |
for job in jobs: | |
# print(job['status']) | |
if job['status'] in ['running', 'failing', 'not_running']: | |
collected_jobs.append({ | |
'pipeline_id': pipeline['id'], | |
'status': job['status'], | |
# 'job_id': job['id'], | |
# 'job_name': job['name'], | |
'project_slug': job['project_slug'] + "/" + workflow["name"], | |
'started_at': job['started_at'] | |
}) | |
# Check if there's another page | |
if 'next_page_token' not in response.json(): | |
break | |
params['page-token'] = response.json()['next_page_token'] | |
return collected_workflows, cancelled_workflows, collected_jobs | |
def group_workflows_statuses_by_project(workflows): | |
project_statuses = defaultdict(lambda: defaultdict(int)) | |
for workflow in workflows: | |
key = workflow['project_slug'] + "/" + workflow["name"] | |
status = workflow['status'] | |
project_statuses[key][status] += 1 | |
return project_statuses | |
if __name__ == "__main__": | |
import argparse | |
import os | |
parser = argparse.ArgumentParser(description='Process CircleCI stats/cancel stuck jobs.') | |
parser.add_argument('--org-slug', type=str, help='Slug of the org') | |
parser.add_argument('--stats-hours', type=int, default=30, help='Number of hours to look back for jobs') | |
parser.add_argument('--cancel', action='store_true', help='Cancel the operation') | |
parser.add_argument('--cancel-hours', type=int, default=8, help='Number of hours to look back for jobs to cancel') | |
args = parser.parse_args() | |
circleci_token = os.getenv('CIRCLECI_TOKEN') | |
if not circleci_token: | |
raise BaseException("Missing $CIRCLECI_TOKEN") | |
else: | |
org_slug = args.org_slug | |
workflows, cancelled_workflows, jobs = get_all_parallel_jobs(org_slug, circleci_token, args.stats_hours, args.cancel, args.cancel_hours) | |
from collections import defaultdict | |
print(f"Total {len(workflows)} workflows") | |
project_statuses = group_workflows_statuses_by_project(workflows) | |
for project_slug, statuses in project_statuses.items(): | |
status_print = [f"Project: {project_slug} - Total: {len(statuses)}"] | |
for status, count in statuses.items(): | |
status_print.append(f"{status.capitalize()}: {count}") | |
print(str.join(" / ", status_print)) | |
running_jobs = [job for job in jobs if job['status'] in ['running', 'failing']] | |
print("-" * 30) | |
print(f"\033[31mTotal {len(cancelled_workflows)} cancelled workflows\033[0m") | |
# print(cancelled_workflows) | |
cancelled_statuses = group_workflows_statuses_by_project(cancelled_workflows) | |
for project_slug, statuses in cancelled_statuses.items(): | |
status_print = [f"Project: {project_slug}"] | |
for status, count in statuses.items(): | |
status_print.append(f"{status.capitalize()}: {count}") | |
print(str.join(" / ", status_print)) | |
print("-" * 30) | |
print(f"Total {len(running_jobs)} running jobs") | |
from collections import Counter | |
project_counts = Counter(running_job['project_slug'] for running_job in running_jobs) | |
for project_slug, count in project_counts.items(): | |
print(f"Running {count} jobs for project: {project_slug}") | |
queued_jobs = [job for job in jobs if job['status'] == 'not_running'] | |
print("-" * 30) | |
print(f"Total {len(queued_jobs)} queued jobs") | |
from collections import Counter | |
project_counts = Counter(queued_job['project_slug'] for queued_job in queued_jobs) | |
for project_slug, count in project_counts.items(): | |
print(f"queued {count} jobs for project: {project_slug}") | |
print("-" * 30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment