Last active
July 8, 2025 09:44
-
-
Save NiceRath/a6e5206a2bf2a6df037d00820758e315 to your computer and use it in GitHub Desktop.
Script to change Ansible-Vault-Password for all Secrets in a YAML file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# USAGE: | |
# python3 ansible-vault-rekey.py -i ~/.vault/old.txt -o ~/.vault/new.txt -f ./secrets/dev.yml | |
# BEHAVIOUR: | |
# reads the YAML variable-file | |
# processes all variable-values recursively | |
# if the variable-value is an Ansible-Vault-encrypted value | |
# it decrypts it with the old password (or throws error if failed to) | |
# it re-encrypts it with the new password | |
# it saves the new content to a new file | |
from time import time | |
from pathlib import Path | |
from os import open as open_file | |
from os import remove as remove_file | |
from argparse import ArgumentParser | |
from re import sub as regex_replace | |
from subprocess import Popen as subprocess_popen | |
from subprocess import PIPE as subprocess_pipe | |
from yaml import safe_load as yaml_load | |
from yaml import safe_dump as yaml_dump | |
from yaml import SafeLoader as YAMLSafeLoader | |
MAX_DICT_DEPTH = 10 | |
TMP_DECRYPT_FILE = f'/tmp/ansible_vault_rekey_{int(time())}.txt' | |
ANSIBLE_VAULT_HEADER = '$ANSIBLE_VAULT' | |
c = 0 | |
def _open_file_0600(path: (str, Path), flags): | |
return open_file(path, flags, 0o600) | |
def _write_file_0600(file: (str, Path), content: str): | |
mode = 'w' | |
if Path(file).is_file(): | |
mode = 'a' | |
with open(file, mode, encoding='utf-8', opener=_open_file_0600) as _file: | |
_file.write(content) | |
def _encrypt_string(secret: str) -> str: | |
# print('Encrypting..', c) | |
with subprocess_popen( | |
f"ansible-vault encrypt_string --vault-pass-file={args.vault_pass_file_out} | tr -d ' '", | |
shell=True, | |
stdin=subprocess_pipe, | |
stdout=subprocess_pipe, | |
) as ps: | |
_stdout, _ = ps.communicate(input=secret.encode('utf-8')) | |
return _stdout.decode('utf-8').strip() | |
def _decrypt_vault(secret: str, key_path: str) -> str: | |
# pylint: disable=W0603 | |
global c | |
c += 1 | |
print(f'Processing.. ({c}: {key_path})') | |
_write_file_0600(file=TMP_DECRYPT_FILE, content=regex_replace(r'!vault \|.*\n', '', secret)) | |
with subprocess_popen( | |
['ansible-vault', 'view', f'--vault-pass-file={args.vault_pass_file_in}', TMP_DECRYPT_FILE], | |
stdout=subprocess_pipe, | |
) as ps: | |
_stdout, _ = ps.communicate() | |
remove_file(TMP_DECRYPT_FILE) | |
if ps.returncode != 0: | |
print(f"ERROR: Unable to decrypt vault '{key_path}'") | |
return secret | |
return _stdout.decode('utf-8').strip() | |
def _rekey_if_vault(value: str, key_path: str) -> str: | |
if value.strip().startswith('!vault |') or value.strip().startswith(ANSIBLE_VAULT_HEADER): | |
plaintext = _decrypt_vault(value, key_path=key_path) | |
if plaintext != value: | |
value = _encrypt_string(plaintext) | |
return value | |
si, li, di = 0, 0, 0 | |
def _process_recursively(value: (str, list, dict), depth: int, key_path: str) -> (str, list, dict): | |
if depth > MAX_DICT_DEPTH: | |
print('ERROR: Max recursion depth reached!') | |
return value | |
# pylint: disable=W0603 | |
global si, li, di | |
if isinstance(value, str): | |
si += 1 | |
return _rekey_if_vault(value, key_path=key_path) | |
if isinstance(value, list): | |
li += 1 | |
values = [] | |
for i, e in enumerate(value): | |
values.append(_process_recursively(e, depth=depth + 1, key_path=f'{key_path}.[{i}]')) | |
return values | |
if isinstance(value, dict): | |
di += 1 | |
values = {} | |
for k, v in value.items(): | |
values[k] = _process_recursively(v, depth=depth + 1, key_path=f'{key_path}.{k}') | |
return values | |
return value | |
parser = ArgumentParser( | |
prog='Ansible-Vault Rekey-Script - ' | |
'Updates all Ansible-Vault entries inside a YAML variable-file', | |
) | |
parser.add_argument( | |
'-f', '--vars-file', help='Absolute path to the variable-file in YAML-format', | |
required=True, | |
) | |
parser.add_argument( | |
'-i', '--vault-pass-file-in', | |
help='Ansible-Vault Password-File to load the existing secrets', | |
required=True, | |
) | |
parser.add_argument( | |
'-o', '--vault-pass-file-out', | |
help='Ansible-Vault Password-File to encrypt the loaded secrets', | |
required=True, | |
) | |
args = parser.parse_args() | |
def construct_vault_encrypted_unicode(loader, node): | |
return loader.construct_scalar(node) | |
YAMLSafeLoader.add_constructor('!vault', construct_vault_encrypted_unicode) | |
def main(): | |
if not Path(args.vault_pass_file_in).is_file(): | |
raise FileNotFoundError(f"Ansible-Vault Password-File for decrypting not found: {args.vault_pass_file_in}") | |
if not Path(args.vault_pass_file_out).is_file(): | |
raise FileNotFoundError(f"Ansible-Vault Password-File for encrypting not found: {args.vault_pass_file_out}") | |
with open(args.vars_file, 'r', encoding='utf-8') as f: | |
content = yaml_load(f) | |
content = _process_recursively(content, depth=0, key_path='') | |
content = yaml_dump(content) | |
# remove empty lines | |
content = content.replace('\n\n', '\n') | |
# remove leading quote | |
content = content.replace("'!vault|", '!vault |') | |
content = content.replace("'!vault |", '!vault |') | |
# remove tailing quote | |
content = regex_replace(r"(ANSIBLE_VAULT;[\d\.]*;[A-Z0-9]*[\n\sa-f0-9]*)'", '\\1', content) | |
out_file = f"{args.vars_file.rsplit('.', 1)[0]}.rekey.yml" | |
with open(out_file, 'w', encoding='utf-8') as f: | |
f.write(content) | |
print(f'DONE! Output file: {out_file}') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment