Created
January 7, 2025 15:23
-
-
Save sedrubal/8f280cd06fc2ec9510d6913715ce5b1b to your computer and use it in GitHub Desktop.
Ansible Lookup Plugin that allows looking up credentials in KeePassXC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright (c) 2023, Sebastian Endres <[email protected]> | |
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) | |
# SPDX-License-Identifier: GPL-3.0-or-later | |
__metaclass__ = type | |
DOCUMENTATION = """ | |
name: keepassxc | |
author: | |
- Sebastian Endres <[email protected]> | |
requirements: | |
- keepassxc-browser (python library) | |
short_description: grab secrets from keepassxc using the keepassxc browser proxy for browser integration | |
description: | |
- Allows you to access data stored in the keepassxc keepassxc/keychain. | |
""" | |
# TODO | |
EXAMPLES = "" | |
# TODO | |
RETURN = "" | |
import re | |
import typing | |
from configparser import ConfigParser | |
from pathlib import Path | |
from ansible.errors import AnsibleError, AnsibleLookupError, AnsibleOptionsError | |
from ansible.plugins.lookup import LookupBase | |
from ansible.utils.display import Display | |
try: | |
import keepassxc_browser | |
HAS_KEEPASSXC_BROWSER = True | |
except ImportError: | |
HAS_KEEPASSXC_BROWSER = False | |
if typing.TYPE_CHECKING: | |
import dbus | |
import keepassxc_browser | |
try: | |
import dbus | |
HAS_DBUS = True | |
except ImportError: | |
HAS_DBUS = False | |
CONFIG_PATH = Path(__file__).parent.parent / ".lookup_keepassxc.conf" | |
RETRIES = 3 | |
CLIENT_ID = "ansible_vault" | |
class RetryLaterError(Exception): | |
"""Exception that is thrown, if we should wait and retry later.""" | |
class LookupModule(LookupBase): | |
def run(self, terms, variables=None, **kwargs): | |
missing_libs = [] | |
for has_lib, lib in [ | |
(HAS_KEEPASSXC_BROWSER, "keepassxc-browser"), | |
(HAS_DBUS, "pydbus"), | |
]: | |
if not has_lib: | |
missing_libs.append(lib) | |
if missing_libs: | |
raise AnsibleError( | |
f"Can't LOOKUP(keepassxc): missing required python library: {', '.join(missing_libs)}" | |
) | |
self.set_options(var_options=variables, direct=kwargs) | |
display = Display() | |
tool = KeepassXCTool(display=display) | |
ret = [] | |
db_name: str | |
url: str | |
filter_re: "str | None" = None | |
db_name, url, *extra = terms | |
if len(extra) not in (0, 1): | |
raise AnsibleLookupError( | |
"Too many arguments for lookup filter keepassxc. " | |
" Expected <db_name> <url> <*filter_re>" | |
) | |
filter_re = str(extra[0]) if extra else None | |
display.vvvv( | |
"lookup command: db_name=%s url=%s, filter_re=%s" | |
% (db_name, url, filter_re) | |
) | |
login = tool.lookup(db_name=db_name, url=url, filter_re=filter_re) | |
display.vvvvvv(str(login)) | |
ret.append(login) | |
return ret | |
class KeepassXCTool: | |
def __init__(self, display: "Display | None" = None): | |
if display: | |
self._v = display.v | |
self._vv = display.vv | |
self._vvv = display.vvv | |
self._vvvv = display.vvvv | |
self._display = display.display | |
self._warning = display.warning | |
if hasattr(display, "prompt_until"): | |
self._prompt = lambda: display.prompt_until("", seconds=15) | |
else: | |
self._prompt = lambda: display.prompt("") | |
else: | |
import logging | |
logger = logging.getLogger() | |
self._v = logger.debug | |
self._vv = logger.debug | |
self._vvv = logger.debug | |
self._vvvv = logger.debug | |
self._display = logger.info | |
self._warning = logger.warning | |
self._prompt = input | |
# load and check config: | |
self._cfg = ConfigParser() | |
self._db_paths: "dict[str, Path]" = {} | |
processed_configs = self._cfg.read(CONFIG_PATH) | |
if not processed_configs: | |
raise AnsibleLookupError(f"Config error: '{CONFIG_PATH}' is missing.") | |
for db_name in self._cfg.sections(): | |
if not self._cfg[db_name].get("path"): | |
raise AnsibleLookupError( | |
f"Config error in '{CONFIG_PATH}':" | |
f" Expected 'path' property in section '{db_name}'" | |
) | |
db_path = Path(self._cfg[db_name]["path"]).expanduser() | |
if not db_path.is_file(): | |
raise AnsibleLookupError( | |
f"Config error in '{CONFIG_PATH}':" | |
f" Keepassxc vault '{db_path}' does not exist." | |
f" (from config section '{db_name}')" | |
) | |
self._db_paths[db_name] = db_path | |
def lookup( | |
self, | |
db_name: str, | |
url: str, | |
filter_re: "str | None" = None, | |
) -> "dict[str, str]": | |
try: | |
entry = self._cfg[db_name] | |
except KeyError as err: | |
raise AnsibleOptionsError( | |
f"Invalid database {db_name}." | |
f" Valid commands are {', '.join(self._cfg.sections())}." | |
f" Is it missing in {CONFIG_PATH}?" | |
) from err | |
try: | |
filter_re = re.compile(filter_re if filter_re else r".*") | |
return self.loop_until_we_get_entry( | |
db_name=db_name, | |
url=url, | |
filter_re=filter_re, | |
) | |
except keepassxc_browser.ProtocolError as err: | |
raise AnsibleLookupError(str(err)) from err | |
def connect_and_auth( | |
self, | |
db_name: str, | |
) -> "tuple[keepassxc_browser.Connection, keepassxc_browser.Identity]": | |
try: | |
try: | |
con = keepassxc_browser.Connection() | |
except OSError: | |
# monkey patch keepassxc for legacy fallback: | |
con = keepassxc_browser.Connection( | |
socket_name=Path( | |
"app/org.keepassxc.KeePassXC/org.keepassxc.KeePassXC.BrowserServer" | |
) | |
) | |
con.connect() | |
except (OSError, Exception) as exc: | |
raise RetryLaterError( | |
f"Could not connect to keepassxc. Is it running? ({exc!s})" | |
) | |
known_db_hash: "str | None" = self._cfg[db_name]["db_hash"] | |
assoc: "str | None" = self._cfg[db_name]["assoc"] | |
db_id = keepassxc_browser.Identity.unserialize(CLIENT_ID, assoc) | |
con.change_public_keys(db_id) | |
try: | |
db_hash = con.get_database_hash(db_id) | |
except keepassxc_browser.ProtocolError as ex: | |
raise AnsibleLookupError(str(ex)) from ex | |
self._vvvv(f"Keepassxc db hash: {db_hash}") | |
if known_db_hash and known_db_hash != db_hash: | |
raise RetryLaterError("You might have opened the wrong database.") | |
if not con.test_associate(db_id): | |
self._v("Not associated yet, associating now...") | |
try: | |
assert con.associate(db_id) | |
except keepassxc_browser.ProtocolError as err: | |
if len(err.args) == 1 and err.args[0] == "Action cancelled or denied": | |
raise RetryLaterError("You denied to associate with ehe database.") | |
else: | |
raise err | |
# update config | |
assoc = db_id.serialize() | |
self._cfg[db_name]["assoc"] = assoc | |
self._cfg[db_name]["db_hash"] = db_hash | |
self._cfg.write(CONFIG_PATH) | |
CONFIG_PATH.chmod(0o600) | |
return con, db_id | |
def loop_until_we_get_entry( | |
self, | |
db_name: str, | |
url: str, | |
filter_re: re.Pattern, | |
) -> "dict[str, str]": | |
try: | |
bus = dbus.SessionBus() | |
except Exception as exc: | |
self._warning(f"Could not connect to session dbus: {exc}") | |
bus = None | |
for i in range(RETRIES): | |
self._v( | |
f"Trying to open {self._format_db_name(db_name)} [{i + 1}/{RETRIES}]..." | |
) | |
db_path = self._db_paths[db_name] | |
if bus: | |
try: | |
bus.call_blocking( | |
"org.keepassxc.KeePassXC.MainWindow", | |
"/keepassxc", | |
"org.keepassxc.KeePassXC.MainWindow", | |
"openDatabase", | |
"s", | |
[str(db_path)], | |
) | |
except dbus.exceptions.DBusException as exc: | |
self._warning( | |
f"Could not activate required keepassxc database '{db_path!s}': {exc!s}" | |
) | |
self._v( | |
f"Trying to access {self._format_db_name(db_name)} [{i + 1}/{RETRIES}]..." | |
) | |
try: | |
con, db_id = self.connect_and_auth(db_name) | |
except RetryLaterError as err: | |
self._warning(str(err)) | |
self._wait_to_open_correct_db(db_name=db_name) | |
continue | |
try: | |
logins = con.get_logins(db_id, url=url) | |
logins = [login for login in logins if filter_re.match(login["login"])] | |
if len(logins) != 1: | |
msg = "\n".join( | |
f"- {login['uuid']} '{login['group']}/{login['name']}'\tUser: '{login['login']}'" | |
for login in logins | |
) | |
raise AnsibleLookupError( | |
f"Expected one entry but got {len(logins)}:\n" | |
f"{msg}\n" | |
"Check the keepassxc settings: Browser Integration > Return only best-matching credentials" | |
) | |
return logins[0] | |
except keepassxc_browser.ProtocolError as err: | |
if len(err.args) != 1 or err.args[0] != "No logins found": | |
raise AnsibleError(f"Keepassxc lookup failed: {err}") from err | |
msg = f"Credential with url '{url}' not in current password store '{db_name}'." | |
self._warning(msg) | |
self._wait_to_open_correct_db(db_name=db_name) | |
con.disconnect() | |
raise AnsibleLookupError("Could not get entry from keepassxc") | |
def _wait_to_open_correct_db(self, db_name: str): | |
self._warning("Check if you have opened the correct database!") | |
self._warning( | |
f"We need {self._format_db_name(db_name)} unlocked and selected in keepassxc!" | |
) | |
self._display("Press enter to continue!") | |
try: | |
self._prompt() | |
except EOFError: | |
self._v("EOF") | |
exit(0) | |
def _format_db_name(self, db_name: str) -> str: | |
return ( | |
"your private database" if db_name == "private" else f"database '{db_name}'" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment