Skip to content

Instantly share code, notes, and snippets.

@sedrubal
Created January 7, 2025 15:23
Show Gist options
  • Save sedrubal/8f280cd06fc2ec9510d6913715ce5b1b to your computer and use it in GitHub Desktop.
Save sedrubal/8f280cd06fc2ec9510d6913715ce5b1b to your computer and use it in GitHub Desktop.
Ansible Lookup Plugin that allows looking up credentials in KeePassXC
#!/usr/bin/env python3
# Copyright (c) 2023, Sebastian Endres <[email protected]>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
__metaclass__ = type
DOCUMENTATION = """
name: keepassxc
author:
- Sebastian Endres <[email protected]>
requirements:
- keepassxc-browser (python library)
short_description: grab secrets from keepassxc using the keepassxc browser proxy for browser integration
description:
- Allows you to access data stored in the keepassxc keepassxc/keychain.
"""
# TODO
EXAMPLES = ""
# TODO
RETURN = ""
import re
import typing
from configparser import ConfigParser
from pathlib import Path
from ansible.errors import AnsibleError, AnsibleLookupError, AnsibleOptionsError
from ansible.plugins.lookup import LookupBase
from ansible.utils.display import Display
try:
import keepassxc_browser
HAS_KEEPASSXC_BROWSER = True
except ImportError:
HAS_KEEPASSXC_BROWSER = False
if typing.TYPE_CHECKING:
import dbus
import keepassxc_browser
try:
import dbus
HAS_DBUS = True
except ImportError:
HAS_DBUS = False
CONFIG_PATH = Path(__file__).parent.parent / ".lookup_keepassxc.conf"
RETRIES = 3
CLIENT_ID = "ansible_vault"
class RetryLaterError(Exception):
"""Exception that is thrown, if we should wait and retry later."""
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
missing_libs = []
for has_lib, lib in [
(HAS_KEEPASSXC_BROWSER, "keepassxc-browser"),
(HAS_DBUS, "pydbus"),
]:
if not has_lib:
missing_libs.append(lib)
if missing_libs:
raise AnsibleError(
f"Can't LOOKUP(keepassxc): missing required python library: {', '.join(missing_libs)}"
)
self.set_options(var_options=variables, direct=kwargs)
display = Display()
tool = KeepassXCTool(display=display)
ret = []
db_name: str
url: str
filter_re: "str | None" = None
db_name, url, *extra = terms
if len(extra) not in (0, 1):
raise AnsibleLookupError(
"Too many arguments for lookup filter keepassxc. "
" Expected <db_name> <url> <*filter_re>"
)
filter_re = str(extra[0]) if extra else None
display.vvvv(
"lookup command: db_name=%s url=%s, filter_re=%s"
% (db_name, url, filter_re)
)
login = tool.lookup(db_name=db_name, url=url, filter_re=filter_re)
display.vvvvvv(str(login))
ret.append(login)
return ret
class KeepassXCTool:
def __init__(self, display: "Display | None" = None):
if display:
self._v = display.v
self._vv = display.vv
self._vvv = display.vvv
self._vvvv = display.vvvv
self._display = display.display
self._warning = display.warning
if hasattr(display, "prompt_until"):
self._prompt = lambda: display.prompt_until("", seconds=15)
else:
self._prompt = lambda: display.prompt("")
else:
import logging
logger = logging.getLogger()
self._v = logger.debug
self._vv = logger.debug
self._vvv = logger.debug
self._vvvv = logger.debug
self._display = logger.info
self._warning = logger.warning
self._prompt = input
# load and check config:
self._cfg = ConfigParser()
self._db_paths: "dict[str, Path]" = {}
processed_configs = self._cfg.read(CONFIG_PATH)
if not processed_configs:
raise AnsibleLookupError(f"Config error: '{CONFIG_PATH}' is missing.")
for db_name in self._cfg.sections():
if not self._cfg[db_name].get("path"):
raise AnsibleLookupError(
f"Config error in '{CONFIG_PATH}':"
f" Expected 'path' property in section '{db_name}'"
)
db_path = Path(self._cfg[db_name]["path"]).expanduser()
if not db_path.is_file():
raise AnsibleLookupError(
f"Config error in '{CONFIG_PATH}':"
f" Keepassxc vault '{db_path}' does not exist."
f" (from config section '{db_name}')"
)
self._db_paths[db_name] = db_path
def lookup(
self,
db_name: str,
url: str,
filter_re: "str | None" = None,
) -> "dict[str, str]":
try:
entry = self._cfg[db_name]
except KeyError as err:
raise AnsibleOptionsError(
f"Invalid database {db_name}."
f" Valid commands are {', '.join(self._cfg.sections())}."
f" Is it missing in {CONFIG_PATH}?"
) from err
try:
filter_re = re.compile(filter_re if filter_re else r".*")
return self.loop_until_we_get_entry(
db_name=db_name,
url=url,
filter_re=filter_re,
)
except keepassxc_browser.ProtocolError as err:
raise AnsibleLookupError(str(err)) from err
def connect_and_auth(
self,
db_name: str,
) -> "tuple[keepassxc_browser.Connection, keepassxc_browser.Identity]":
try:
try:
con = keepassxc_browser.Connection()
except OSError:
# monkey patch keepassxc for legacy fallback:
con = keepassxc_browser.Connection(
socket_name=Path(
"app/org.keepassxc.KeePassXC/org.keepassxc.KeePassXC.BrowserServer"
)
)
con.connect()
except (OSError, Exception) as exc:
raise RetryLaterError(
f"Could not connect to keepassxc. Is it running? ({exc!s})"
)
known_db_hash: "str | None" = self._cfg[db_name]["db_hash"]
assoc: "str | None" = self._cfg[db_name]["assoc"]
db_id = keepassxc_browser.Identity.unserialize(CLIENT_ID, assoc)
con.change_public_keys(db_id)
try:
db_hash = con.get_database_hash(db_id)
except keepassxc_browser.ProtocolError as ex:
raise AnsibleLookupError(str(ex)) from ex
self._vvvv(f"Keepassxc db hash: {db_hash}")
if known_db_hash and known_db_hash != db_hash:
raise RetryLaterError("You might have opened the wrong database.")
if not con.test_associate(db_id):
self._v("Not associated yet, associating now...")
try:
assert con.associate(db_id)
except keepassxc_browser.ProtocolError as err:
if len(err.args) == 1 and err.args[0] == "Action cancelled or denied":
raise RetryLaterError("You denied to associate with ehe database.")
else:
raise err
# update config
assoc = db_id.serialize()
self._cfg[db_name]["assoc"] = assoc
self._cfg[db_name]["db_hash"] = db_hash
self._cfg.write(CONFIG_PATH)
CONFIG_PATH.chmod(0o600)
return con, db_id
def loop_until_we_get_entry(
self,
db_name: str,
url: str,
filter_re: re.Pattern,
) -> "dict[str, str]":
try:
bus = dbus.SessionBus()
except Exception as exc:
self._warning(f"Could not connect to session dbus: {exc}")
bus = None
for i in range(RETRIES):
self._v(
f"Trying to open {self._format_db_name(db_name)} [{i + 1}/{RETRIES}]..."
)
db_path = self._db_paths[db_name]
if bus:
try:
bus.call_blocking(
"org.keepassxc.KeePassXC.MainWindow",
"/keepassxc",
"org.keepassxc.KeePassXC.MainWindow",
"openDatabase",
"s",
[str(db_path)],
)
except dbus.exceptions.DBusException as exc:
self._warning(
f"Could not activate required keepassxc database '{db_path!s}': {exc!s}"
)
self._v(
f"Trying to access {self._format_db_name(db_name)} [{i + 1}/{RETRIES}]..."
)
try:
con, db_id = self.connect_and_auth(db_name)
except RetryLaterError as err:
self._warning(str(err))
self._wait_to_open_correct_db(db_name=db_name)
continue
try:
logins = con.get_logins(db_id, url=url)
logins = [login for login in logins if filter_re.match(login["login"])]
if len(logins) != 1:
msg = "\n".join(
f"- {login['uuid']} '{login['group']}/{login['name']}'\tUser: '{login['login']}'"
for login in logins
)
raise AnsibleLookupError(
f"Expected one entry but got {len(logins)}:\n"
f"{msg}\n"
"Check the keepassxc settings: Browser Integration > Return only best-matching credentials"
)
return logins[0]
except keepassxc_browser.ProtocolError as err:
if len(err.args) != 1 or err.args[0] != "No logins found":
raise AnsibleError(f"Keepassxc lookup failed: {err}") from err
msg = f"Credential with url '{url}' not in current password store '{db_name}'."
self._warning(msg)
self._wait_to_open_correct_db(db_name=db_name)
con.disconnect()
raise AnsibleLookupError("Could not get entry from keepassxc")
def _wait_to_open_correct_db(self, db_name: str):
self._warning("Check if you have opened the correct database!")
self._warning(
f"We need {self._format_db_name(db_name)} unlocked and selected in keepassxc!"
)
self._display("Press enter to continue!")
try:
self._prompt()
except EOFError:
self._v("EOF")
exit(0)
def _format_db_name(self, db_name: str) -> str:
return (
"your private database" if db_name == "private" else f"database '{db_name}'"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment