Instantly share code, notes, and snippets.
Created
September 18, 2025 13:43
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save filipenf/22c371e85e498fe5f18c3a684b0f4a53 to your computer and use it in GitHub Desktop.
Atlantis Apply wrapper for break-glass scenario
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # This script is used to ensure that the user is allowed to run atlantis apply by | |
| # checking if the PR is mergeable OR if the user is performing a break-glass apply | |
| # (by running atlantis apply -- break-glass) and is authorized to do so. | |
| # | |
| # To determine if the user is authorized to run the break-glass command, we checK: | |
| # 1) belong to the SRE team in the github org | |
| # 2) have added the break-glass label to the PR | |
| # | |
| # You can test the script manually by setting/exporting the env vars ie: | |
| # | |
| # COMMENT_ARGS=break-glass \ | |
| # BASE_REPO_OWNER=<my_org> \ | |
| # BASE_REPO_NAME=my-repo-name \ | |
| # PULL_NUM=1234 \ | |
| # USER_NAME=johndoe \ | |
| # ATLANTIS_GITHUB_TOKEN=ghp_xxx \ | |
| # ./authorize_atlantis_command.py | |
| import logging | |
| import os | |
| import subprocess | |
| import sys | |
| from urllib import request, error | |
| import json | |
| def getenv_required(name): | |
| value = os.getenv(name) | |
| if not value: | |
| print(f"Missing required environment variable: {name}") | |
| sys.exit(1) | |
| return value | |
| TEAM_SLUG = "sre" | |
| REQUIRED_LABEL = "break-glass" | |
| def github_token(): | |
| return getenv_required("ATLANTIS_GITHUB_TOKEN") | |
| def is_pr_approved(owner: str, repo: str, pull_num: int) -> bool: | |
| token = github_token() | |
| url = "https://api.github.com/graphql" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| } | |
| query = """ | |
| query($owner: String!, $repo: String!, $pr: Int!) { | |
| repository(owner: $owner, name: $repo) { | |
| pullRequest(number: $pr) { | |
| reviewDecision | |
| } | |
| } | |
| } | |
| """ | |
| variables = {"owner": owner, "repo": repo, "pr": pull_num} | |
| payload = json.dumps({"query": query, "variables": variables}).encode("utf-8") | |
| req = request.Request(url, data=payload, headers=headers) | |
| with request.urlopen(req) as response: | |
| resp_data = response.read() | |
| data = json.loads(resp_data) | |
| logging.debug(json.dumps(data, indent=2)) | |
| decision = data["data"]["repository"]["pullRequest"]["reviewDecision"] | |
| return decision == "APPROVED" | |
| def api_call(url): | |
| token = github_token() | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Accept": "application/vnd.github+json", | |
| "X-GitHub-Api-Version": "2022-11-28", | |
| } | |
| req = request.Request(url, headers=headers) | |
| try: | |
| with request.urlopen(req) as resp: | |
| status_code = resp.getcode() | |
| resp_text = resp.read().decode() | |
| return status_code, resp_text | |
| except error.HTTPError as e: | |
| return e.code, e.read().decode() | |
| def get_pr_info(owner, repo_name, pull_num): | |
| pr_url = f"https://api.github.com/repos/{owner}/{repo_name}/pulls/{pull_num}" | |
| status_code, resp_text = api_call(pr_url) | |
| if status_code != 200: | |
| print( | |
| f"Unable to fetch PR details for {pull_num}. Response: {status_code} {resp_text}" | |
| ) | |
| raise Exception( | |
| f"Unable to fetch PR details for {pull_num}. Response: {status_code}" | |
| ) | |
| pr_data = json.loads(resp_text) | |
| logging.debug(json.dumps(pr_data, indent=2)) | |
| return pr_data | |
| def is_mergeable(pr_data): | |
| if pr_data.get("state") != "open": | |
| raise Exception("PR is already merged") | |
| mergeable = pr_data.get("mergeable_state") != "blocked" | |
| return mergeable | |
| def is_team_member(org, team, user): | |
| team_url = f"https://api.github.com/orgs/{org}/teams/{team}/memberships/{user}" | |
| status_code, resp_text = api_call(team_url) | |
| try: | |
| response_json = json.loads(resp_text) | |
| except Exception as e: | |
| return False | |
| if response_json.get("state") != "active": | |
| return False | |
| return True | |
| def is_repo_owner(user): | |
| """ | |
| Returns true if the user's team is owner of the repo. | |
| To determine ownership we check if the user belong to the team | |
| that owns * on the CODEOWNERS file. | |
| Atlantis sets the DIR environment variable to the terraform project | |
| path and REPO_REL_DIR to the relative path of the project | |
| """ | |
| tf_dir = getenv_required("DIR") | |
| rel_dir = os.path.normpath(getenv_required("REPO_REL_DIR")).lstrip("./") | |
| repo_root = tf_dir[: tf_dir.rfind(rel_dir)] | |
| codeowners_path = os.path.join(repo_root, ".github/CODEOWNERS") | |
| if not os.path.exists(codeowners_path): | |
| print(f"⚠️CODEOWNERS not found in {codeowners_path}") | |
| return False | |
| with open(codeowners_path, "r") as f: | |
| lines = f.readlines() | |
| for line in lines: | |
| line = line.strip() | |
| if line == "" or line.startswith("#"): | |
| continue | |
| line = line.split(" ") | |
| path = line[0] | |
| if path != "*": | |
| continue | |
| for owner in line[1:]: | |
| if not owner.startswith("@"): | |
| continue | |
| if owner.count("/") != 1: | |
| # ownership needs to be mapped to a team in the org | |
| continue | |
| org, team = owner.lstrip("@").split("/") | |
| if is_team_member(org, team, user): | |
| return True | |
| return False | |
| def has_break_glass_label(pr_data): | |
| """ | |
| Returns true if the PR has the "break-glass" label | |
| """ | |
| if not any( | |
| label.get("name") == REQUIRED_LABEL for label in pr_data.get("labels", []) | |
| ): | |
| return False | |
| return True | |
| def authorize_break_glass(pr_data, owner, repo_name, pull_num, user): | |
| print( | |
| f"Authorizing break-glass for user '{user}' on {owner}/{repo_name}#{pull_num} ..." | |
| ) | |
| repo_owner = is_repo_owner(user) | |
| is_admin = repo_owner or is_team_member(owner, TEAM_SLUG, user) | |
| has_label = has_break_glass_label(pr_data) | |
| authorized = is_admin and has_label | |
| if not has_label: | |
| print( | |
| "Please add the 'break-glass' label to the Pull Request in order to proceed" | |
| ) | |
| if not is_admin: | |
| print("In order to use the break-glass command, you need to be an admin") | |
| return authorized | |
| def ok_to_apply(user): | |
| owner = getenv_required("BASE_REPO_OWNER") | |
| repo_name = getenv_required("BASE_REPO_NAME") | |
| pull_num = int(getenv_required("PULL_NUM")) | |
| comment_args = os.getenv("COMMENT_ARGS", "").replace("\\", "").split(",") | |
| try: | |
| pr_data = get_pr_info(owner, repo_name, pull_num) | |
| if is_mergeable(pr_data) or is_pr_approved(owner, repo_name, pull_num): | |
| print("✅ PR is mergeable, proceeding with apply.") | |
| return True | |
| if len(comment_args) > 0 and comment_args[0] == "break-glass": | |
| if authorize_break_glass(pr_data, owner, repo_name, pull_num, user): | |
| print(f"✅ Break-glass authorization passed for '{user}'.") | |
| return True | |
| else: | |
| print( | |
| "🚫 PR is not mergeable. If this is an emergency please add -- break-glass to the atlantis apply command " | |
| ) | |
| return False | |
| except Exception as e: | |
| print(f"❌ Unable to authorize: {e}") | |
| if __name__ == "__main__": | |
| if os.getenv("DEBUG", "false").lower() == "true": | |
| logging.basicConfig(level=logging.DEBUG) | |
| else: | |
| logging.basicConfig(level=logging.INFO) | |
| user = os.getenv("USER_NAME") or os.getenv("PULL_AUTHOR") | |
| if not user: | |
| print("⚠️Unable to determine triggering user. Aborting") | |
| sys.exit(1) | |
| try: | |
| if ok_to_apply(user): | |
| terraform_version = os.getenv("ATLANTIS_TERRAFORM_VERSION", "") | |
| terraform = f"terraform{terraform_version}" | |
| plan_file = getenv_required("PLANFILE") | |
| subprocess.run([terraform, "apply", plan_file], check=True) | |
| else: | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"❌ Error during apply: {e}") | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment