Last active
July 28, 2023 23:50
-
-
Save bneutra/2b8907e0f3956e91e31abcc851624ef1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import concurrent.futures | |
import logging | |
import os | |
import sys | |
import time | |
from collections import deque | |
import click | |
from dictdiffer import diff | |
CURRENT_USER = os.getlogin() | |
INFRA_ROOT = "/Users/brendan.neutra/infra" | |
sys.path.append(INFRA_ROOT) | |
from scripts.tfc_util import TerraformCloudApiClient, load_token # noqa: E402 | |
INFRA_REPO = "https://github.com/benchling/infra" | |
@click.group() | |
def main() -> None: | |
pass | |
def plan_differ(before, after): | |
result = diff(before, after) | |
diff_list = list(result) | |
return output_diff(diff_list) | |
def output_diff(diff): | |
response = "" | |
for item in diff: | |
# TODO: dictdiffer has an irregular way of representing diffs | |
# i.e. path could be a string (simple key) or a list (nested path) | |
# change could be a tuple(str) (simple string change) or | |
# tuple(list), tuple(list(other)) for more complex changes | |
# we should build our own differ, use another tool or process | |
# the dictdiffer diff to be more tf plan like (i.e. human readable) | |
attr: str = str(item[1]) | |
change: str = str(item[2]) | |
if attr: | |
response += f"attribute: {attr}\n" | |
if len(change) > 5000: | |
change = change[:5000] + "... truncated" | |
response += f"{change}\n" | |
return response | |
def process_plan_response(plan_json, terse): | |
resources=plan_json.get('resource_changes') | |
updates=[item for item in resources if 'update' in item['change']['actions']] | |
deletes=[item for item in resources if 'delete' in item['change']['actions']] | |
creates=[item for item in resources if 'create' in item['change']['actions']] | |
if not updates and not deletes and not creates: | |
return False | |
for change in creates: | |
diff = plan_differ(change["change"]["before"], change["change"]["after"]) | |
address = change['address'] | |
click.secho(f"CREATE: ", fg='green', nl=False) | |
click.echo(address) | |
if not terse: | |
click.secho(f"{diff}", fg='green') | |
for change in updates: | |
diff = plan_differ(change["change"]["before"], change["change"]["after"]) | |
address = change['address'] | |
click.secho(f"UPDATE: ", fg='yellow', nl=False) | |
click.echo(address) | |
if not terse: | |
click.secho(f"{diff}", fg='yellow') | |
for change in deletes: | |
diff = plan_differ(change["change"]["before"], change["change"]["after"]) | |
address = change['address'] | |
click.secho(f"DELETE: ", fg='red', nl=False) | |
click.echo(address) | |
if not terse: | |
click.secho(f"{diff}", fg='red') | |
return True | |
def get_latest_runs(client, branch, pr, tag_filters, dry_run): | |
workspaces_data = client.list_workspaces(tag_filters=tag_filters) | |
workspaces = [] | |
skipped = 0 | |
for ws in workspaces_data: | |
skip_msg = f"Skipping {ws.name}, {ws.current_run_id}" | |
if ws.execution_mode != "agent": | |
logging.debug(f"{skip_msg} ws mode is not 'agent'") | |
skipped +=1 | |
continue | |
if ws.vcs_branch != branch: | |
logging.debug(f"{skip_msg} ws branch is not {branch}") | |
skipped += 1 | |
continue | |
if ws.vcs_repo != INFRA_REPO: | |
skipped += 1 | |
logging.debug(f"{skip_msg} repo is not {INFRA_REPO}") | |
continue | |
run_info = client.get_run(ws.current_run_id) | |
this_commit_msg: str = run_info.message.split("\n")[0] | |
if pr and f"(#{pr})" not in this_commit_msg: | |
print(f"{skip_msg} PR not {pr}'") | |
continue | |
if run_info.status != "planned": | |
if not dry_run: | |
print(f"{skip_msg} status is '{run_info.status}' not 'planned'") | |
continue | |
if ws.current_run_id != ws.latest_run_id: | |
print(f"INFO: current_run_id != latest_run_id") | |
workspaces.append( | |
{ | |
"id": ws.id, | |
"name": ws.name, | |
"current_run": ws.current_run_id, # should we use latest_run_id? they seem the same | |
"commit_message": this_commit_msg | |
} | |
) | |
print( | |
f"{len(workspaces)} runs to apply!, {skipped} workspaces silently skipped " | |
"due to branch, repo, or agent_mode mismatch\n" | |
"Now retrieving plans in the background..." | |
) | |
return workspaces | |
def process_plan(run_info: list): | |
client: TerraformCloudApiClient = TerraformCloudApiClient(api_token=load_token()) | |
return client.get_plan_in_json(run_id=run_info[1], plan_id=None) | |
def poll_for_completion(client, runs): | |
start_time = time.time() | |
deadline_s = 900 | |
print(f"Checking {len(runs)} runs for completion... timeout is {deadline_s}s") | |
poll_s = 1 | |
runq = deque(runs) | |
applied = 0 | |
while time.time() < start_time + deadline_s and runq: | |
run = runq.pop() | |
response = client.get_run(run[1]) | |
if response.status == "applied": | |
print(f"{run[0]} was applied!") | |
applied += 1 | |
continue | |
elif response.status == "errored": | |
print(f"{run[0]} was errored!") | |
continue | |
runq.appendleft(run) | |
time.sleep(poll_s) | |
print(f"{applied} out of {len(runs)} runs were applied successfully") | |
@main.command() | |
@click.option("--pr", required=False, default="", help="PR number to filter runs by") | |
@click.option("--branch", required=False, default="main", help="infra branch to filter runs by, default main") | |
@click.option("--terse", is_flag=True, default=False, help="Output abbreviated plan changes") | |
@click.option("--dry-run", is_flag=True) | |
@click.argument("tags", nargs=-1, required=True) | |
def cli_main(pr, branch, terse, dry_run, tags): | |
"""Scan workspaces by TFC tags, interactively review plans, apply.""" | |
if not pr and not branch: | |
sys.exit("You must provide one of either --pr or --branch") | |
client: TerraformCloudApiClient = TerraformCloudApiClient(api_token=load_token()) | |
workspaces = get_latest_runs(client, branch, pr, tags, dry_run) | |
runs = [[ws.get("name"), ws.get("current_run")] for ws in workspaces] | |
applied = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
future_to_run = {executor.submit(process_plan, run_info): run_info for run_info in runs} | |
for future in concurrent.futures.as_completed(future_to_run): | |
run_info = future_to_run[future] | |
try: | |
plan_data = future.result() | |
ws_name = run_info[0] | |
run_id = run_info[1] | |
print("+" * 80) | |
click.secho(f"+++ {ws_name} +++", fg='green') | |
result = process_plan_response(plan_data, terse) | |
if not result: | |
# TFC sometimes wants a plan applied even if no changes exist | |
click.secho(f"WARNING: plan produced no changes, apply anyway?", fg="yellow") | |
if dry_run: | |
if click.confirm(f"--dry-run review only {ws_name}, next?"): | |
pass | |
continue | |
else: | |
if not click.confirm(f"Proceed with the above plan for {ws_name}?"): | |
click.echo(f"Skippping {ws_name}...") | |
continue | |
click.echo(f"Asynchronously applying {ws_name} run: {run_id}...") | |
client.apply_run(run_id, f"tfc_plan_util.py applied by {CURRENT_USER}") | |
applied.append((ws_name, run_id)) | |
except Exception as exc: | |
print('%r generated an exception: %s' % (run_info, exc)) | |
print(f"{len(applied)} of {len(runs)} plans were applied") | |
if applied: | |
poll_for_completion(client, applied) | |
if __name__ == "__main__": | |
cli_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment