Skip to content

Instantly share code, notes, and snippets.

@stigok
Last active January 13, 2025 12:55
Show Gist options
  • Save stigok/9eca08a55f1105f336f0161e2a0fb37e to your computer and use it in GitHub Desktop.
Save stigok/9eca08a55f1105f336f0161e2a0fb37e to your computer and use it in GitHub Desktop.
Filter Terraform plan results
#!/usr/bin/env python3
"""
Helper to generate `-target='<resource address>'` arguments to terraform operations
from Terraform output. Operates on text only and does not touch Terraform itself.
Writes output to stdout.
"""
import argparse
import hashlib
import io
import json
import logging
import os.path
import re
import subprocess
import sys
import tempfile
CACHE_FILE_PREFIX = os.path.join(tempfile.gettempdir(), ".tfilter_cache_file_")
def plan(*, input_file):
digest = hashlib.file_digest(input_file, "sha256").hexdigest()
cachepath = CACHE_FILE_PREFIX + digest
# Use existing plan from cache
if os.path.exists(cachepath):
logging.info("using cached plan")
try:
with open(cachepath) as f:
return json.load(f)
except json.decoder.JSONDecodeError:
logging.exception("failed to parse cache file")
# ... or generate the plan file again and store it in cache
logging.debug("no cached entry found -- running `terraform show`")
plan = subprocess.run(
["terraform", "show", input_file.name, "-no-color"],
capture_output=True,
check=True,
text=True,
).stdout
with open(cachepath, "w") as f:
logging.debug("writing cache file to %s", cachepath)
json.dump(plan, f)
return plan
def parse(
*,
input_file,
name_filter=None,
re_filter=None,
ex_re_filter=None,
input_format=None,
output_format="target",
out=sys.stdout,
):
targets = []
# Reads a list of resource addresses
if input_format == "list":
# If each line is already an address, no parsing is needed.
for line in input_file.read().decode().splitlines():
if line := line.strip():
targets.append(line)
continue
# Reads a Terraform plan file
else:
plan_str = plan(input_file=input_file)
# Parse output from `terraform plan` to extract module names.
for line in plan_str.splitlines():
# Start over again if this line appears. Terraform plan will also print out resource
# drift, which is rarely interesting.
if "Terraform will perform the following actions:" in line:
targets.clear()
continue
if re.match(r"^\s+#.+(will|must) be \w+", line):
logging.debug("match: %s", line)
line = line.strip()
# Remove all text but the actual resource address
line = re.sub("^#\s*", "", line)
line = re.sub("\s*(will|must) be .+$", "", line)
if " " not in line: # prevents catching '# Warning: ' lines
targets.append(line)
if m := re.match("^\s+#\s+(\S+) has moved to (\S+)$", line):
logging.debug("match: %s", line)
# Remove instance references, as "moved" items some times (or always?)
# need to have the root item in the targets for Terraform to be happy
# moving them.
targets.append(m.group(1).split("[")[0])
targets.append(m.group(2).split("[")[0])
logging.debug("targets before filter: %s", targets)
if re_filter:
pat = re.compile(re_filter, flags=re.IGNORECASE)
targets = [t for t in targets if pat.search(t)]
elif name_filter:
targets = [t for t in targets if name_filter in t]
logging.debug("targets before exclude: %s", targets)
if ex_re_filter:
pat = re.compile(ex_re_filter, flags=re.IGNORECASE)
targets = [t for t in targets if not pat.search(t)]
output = []
if output_format == "target":
output.append(" ".join((f"-target='{t}'" for t in set(targets))))
elif output_format == "list":
for t in set(targets):
output.append(t)
else:
raise NotImplementedError(f"output {output_format} is not implemented")
# Print lines sorted
for line in sorted(output):
print(line, file=out)
def test():
tests = {
" # module.foo.bar will be updated": "-target='module.foo.bar'",
" # module.foo.bar must be replaced": "-target='module.foo.bar'",
" # module.foo.bar will be updated": "-target='module.foo.bar'",
" # module.foo.bar must be replaced": "-target='module.foo.bar'",
" # module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0] will be updated": "-target='module.nrk_google_ne_prod.module.common.null_resource.ingress_controller_wildcard_cert[0]'",
}
for s, expected in tests.items():
inp = io.StringIO(s + "\n")
out = io.StringIO()
parse(input_file=inp, out=out)
actual = out.getvalue()
assert actual == expected, "'%s' != '%s'" % (actual, expected)
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"-f",
"--name-filter",
help="a string to filter out targets with a matching substring",
)
parser.add_argument(
"-r",
"--re-filter",
help="a regular expression to return only targets that matches (overrides --name-filter)",
)
parser.add_argument(
"-e",
"--ex-re-filter",
help="a regular expression to return only targets that DOES NOT match (applied after all other filters)",
)
parser.add_argument(
"-i",
"--input-format",
choices=("plan", "list"),
default="plan",
help="format of the input. 'plan' is the output from 'terraform plan -no-color', while list is the output of 'terraform state list'.",
)
parser.add_argument(
"-o",
"--output-format",
choices=("target", "list"),
default="target",
help="output format of the addresses. 'targets' outputs like space separated '-target=address' items, while list prints just newline separated resource addresses.",
)
parser.add_argument(
"input_file",
type=argparse.FileType("r+b"),
nargs="?",
help="a Terraform binary plan file",
default="out.plan",
)
parser.add_argument(
"--test",
action="store_true",
help="run tests",
)
parser.add_argument(
"--debug",
action="store_true",
help="enable debug logging",
)
args = vars(parser.parse_args())
if args.pop("debug"):
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
if args.pop("test"):
print("Running tests")
test()
return
logging.debug("Calling with args: %s", args)
parse(**args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment