Skip to content

Instantly share code, notes, and snippets.

@oneamtu
Created May 2, 2025 21:15
Show Gist options
  • Save oneamtu/bdc4ddf6eea497e398a648ba9cc68e37 to your computer and use it in GitHub Desktop.
Save oneamtu/bdc4ddf6eea497e398a648ba9cc68e37 to your computer and use it in GitHub Desktop.
CircleCI check_ci_jobs.py
import requests
from datetime import datetime, timedelta
def hours_since_timestamp_s(timestamp_s):
return datetime.utcnow() - datetime.fromisoformat(timestamp_s)
def get_all_parallel_jobs(org_slug, api_token, stats_hours, cancel, cancel_hours):
headers = {
'Circle-Token': api_token,
'Content-Type': 'application/json'
}
# Get recent jobs (last 30 days)
end_time = datetime.now().isoformat()
start_time = (datetime.now() - timedelta(days=30)).isoformat()
url = f'https://circleci.com/api/v2/pipeline'
params = {
'org-slug': org_slug,
'page-token': None,
# 'mine': 'False'
}
collected_workflows = []
cancelled_workflows = []
collected_jobs = []
done = False
while not done:
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise BaseException(response.text)
else:
pipelines = response.json()['items']
for pipeline in pipelines:
# print(pipeline['created_at'])
# import pdb; pdb.set_trace()
if hours_since_timestamp_s(pipeline['created_at'][:-1]) > timedelta(hours=stats_hours):
done = True
break
# print(f"Pipeline: {pipeline}")
workflow_url = f"https://circleci.com/api/v2/pipeline/{pipeline['id']}/workflow"
workflow_response = requests.get(workflow_url, headers=headers)
if workflow_response.status_code != 200:
raise BaseException(workflow_response.text)
else:
workflows = workflow_response.json()['items']
collected_workflows.extend(workflows)
for workflow in workflows:
# print(f"Workflow: {workflow}")
if cancel and (workflow['status'] in ['running']) and \
(hours_since_timestamp_s(workflow['created_at'][:-1]) > timedelta(hours=cancel_hours)):
cancel_url = f"https://circleci.com/api/v2/workflow/{workflow['id']}/cancel"
cancel_response = requests.post(cancel_url, headers=headers)
if cancel_response.status_code != 200:
raise BaseException(cancel_response.text)
else:
cancelled_workflows.append(workflow)
jobs_url = f"https://circleci.com/api/v2/workflow/{workflow['id']}/job"
jobs_response = requests.get(jobs_url, headers=headers)
if jobs_response.status_code != 200:
raise BaseException(jobs_response.text)
else:
jobs = jobs_response.json()['items']
for job in jobs:
# print(job['status'])
if job['status'] in ['running', 'failing', 'not_running']:
collected_jobs.append({
'pipeline_id': pipeline['id'],
'status': job['status'],
# 'job_id': job['id'],
# 'job_name': job['name'],
'project_slug': job['project_slug'] + "/" + workflow["name"],
'started_at': job['started_at']
})
# Check if there's another page
if 'next_page_token' not in response.json():
break
params['page-token'] = response.json()['next_page_token']
return collected_workflows, cancelled_workflows, collected_jobs
def group_workflows_statuses_by_project(workflows):
project_statuses = defaultdict(lambda: defaultdict(int))
for workflow in workflows:
key = workflow['project_slug'] + "/" + workflow["name"]
status = workflow['status']
project_statuses[key][status] += 1
return project_statuses
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser(description='Process CircleCI stats/cancel stuck jobs.')
parser.add_argument('--org-slug', type=str, help='Slug of the org')
parser.add_argument('--stats-hours', type=int, default=30, help='Number of hours to look back for jobs')
parser.add_argument('--cancel', action='store_true', help='Cancel the operation')
parser.add_argument('--cancel-hours', type=int, default=8, help='Number of hours to look back for jobs to cancel')
args = parser.parse_args()
circleci_token = os.getenv('CIRCLECI_TOKEN')
if not circleci_token:
raise BaseException("Missing $CIRCLECI_TOKEN")
else:
org_slug = args.org_slug
workflows, cancelled_workflows, jobs = get_all_parallel_jobs(org_slug, circleci_token, args.stats_hours, args.cancel, args.cancel_hours)
from collections import defaultdict
print(f"Total {len(workflows)} workflows")
project_statuses = group_workflows_statuses_by_project(workflows)
for project_slug, statuses in project_statuses.items():
status_print = [f"Project: {project_slug} - Total: {len(statuses)}"]
for status, count in statuses.items():
status_print.append(f"{status.capitalize()}: {count}")
print(str.join(" / ", status_print))
running_jobs = [job for job in jobs if job['status'] in ['running', 'failing']]
print("-" * 30)
print(f"\033[31mTotal {len(cancelled_workflows)} cancelled workflows\033[0m")
# print(cancelled_workflows)
cancelled_statuses = group_workflows_statuses_by_project(cancelled_workflows)
for project_slug, statuses in cancelled_statuses.items():
status_print = [f"Project: {project_slug}"]
for status, count in statuses.items():
status_print.append(f"{status.capitalize()}: {count}")
print(str.join(" / ", status_print))
print("-" * 30)
print(f"Total {len(running_jobs)} running jobs")
from collections import Counter
project_counts = Counter(running_job['project_slug'] for running_job in running_jobs)
for project_slug, count in project_counts.items():
print(f"Running {count} jobs for project: {project_slug}")
queued_jobs = [job for job in jobs if job['status'] == 'not_running']
print("-" * 30)
print(f"Total {len(queued_jobs)} queued jobs")
from collections import Counter
project_counts = Counter(queued_job['project_slug'] for queued_job in queued_jobs)
for project_slug, count in project_counts.items():
print(f"queued {count} jobs for project: {project_slug}")
print("-" * 30)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment