Last active
April 7, 2025 13:20
-
-
Save Derkades/053e7dbb173a156ad1bc2a26b900cead to your computer and use it in GitHub Desktop.
Certwarden Ansible module
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Copyright: (c) 2024, Robin Slot <[email protected]> | |
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
from __future__ import absolute_import, division, print_function | |
import json | |
import time | |
from urllib.request import Request, urlopen | |
from typing import TYPE_CHECKING, Any, Dict, List, Union, cast | |
if TYPE_CHECKING: | |
from typing import TypedDict | |
__metaclass__ = type | |
DOCUMENTATION = r""" | |
--- | |
module: certwarden | |
short_description: Create certificates in Certwarden | |
# If this is part of a collection, you need to use semantic versioning, | |
# i.e. the version is of the form "2.5.0" and not "2.4". | |
version_added: "1.0.0" | |
description: Ansible module to create certificates in Certwarden. | |
options: | |
server: | |
description: Server url, e.g. https://certwarden.example.org | |
required: true | |
type: str | |
username: | |
description: Username | |
required: true | |
type: str | |
password: | |
description: Password | |
required: true | |
type: str | |
account: | |
description: ACME account name | |
required: true | |
type: str | |
subject: | |
description: Subject FQDN. | |
required: true | |
type: str | |
subject_alts: | |
description: | |
- Additional subject alt names. | |
required: true | |
type: list | |
# Specify this value according to your collection | |
# in format of namespace.collection.doc_fragment_name | |
# extends_documentation_fragment: | |
# - my_namespace.my_collection.my_doc_fragment_name | |
author: | |
- Robin Slot (@Derkades) | |
""" | |
EXAMPLES = r""" | |
# Pass in a message | |
- name: Create certificate | |
certwarden: | |
server: https://certwarden.example.org | |
username: admin | |
password: "{{ certwarden_password }}" | |
account: letsencrypt_staging | |
subject: 'example.org' | |
subject_alts: ['example.net'] # optional | |
""" | |
# TODO | |
RETURN = r""" | |
cert_api_key: | |
description: API key to obtain certificate | |
type: str | |
returned: always | |
sample: 'xu8nodsUfetpP1D1rQ3H76p3C4n2Zk' | |
key_api_key: | |
description: API key to obtain certificate private key | |
type: str | |
returned: always | |
sample: '6aeGXnN33ULhGjewVYpBOu6HBYNBvC' | |
""" | |
from ansible.module_utils.basic import AnsibleModule | |
if TYPE_CHECKING: | |
Authorization = TypedDict( | |
"Authorization", | |
{ | |
"username": str, | |
"user_type": str, | |
"access_token": str, | |
"access_token_exp": int, | |
"session_exp": int, | |
}, | |
) | |
PrivateKeyBasic = TypedDict("PrivateKeyBasic", {"id": int, "name": str}) | |
PrivateKeyFull = TypedDict( | |
"PrivateKeyFull", | |
{ | |
"id": int, | |
"name": str, | |
"description": str, | |
"api_key_disabled": bool, | |
"api_key_via_url": bool, | |
"last_access": int, | |
"api_key": str, | |
"created_at": int, | |
"updated_at": int, | |
}, | |
) | |
AcmeServer = TypedDict("AcmeServer", {"id": int, "name": str, "is_staging": bool}) | |
AcmeAccount = TypedDict( | |
"AcmeAccount", {"id": int, "name": str, "acme_server": AcmeServer} | |
) | |
CertificateFull = TypedDict( | |
"CertificateFull", | |
{ | |
"id": int, | |
"name": str, | |
"description": str, | |
"private_key": PrivateKeyBasic, | |
"acme_account": AcmeAccount, | |
"subject": str, | |
"subject_alts": List[str], | |
"api_key_via_url": bool, | |
"last_access": int, | |
"organization": str, | |
"organizational_unit": str, | |
"country": str, | |
"state": str, | |
"city": str, | |
"csr_extra_extensions": List[str], | |
"preferred_root_cn": str, | |
"created_at": int, | |
"updated_at": int, | |
"api_key": str, | |
"post_processing_command": str, | |
"post_processing_environment": List[str], | |
"post_processing_client_key": str, | |
}, | |
) | |
CertificateBasic = TypedDict( | |
"CertificateBasic", | |
{ | |
"id": int, | |
"name": str, | |
"description": str, | |
"private_key": PrivateKeyBasic, | |
"acme_account": AcmeAccount, | |
"subject": str, | |
"subject_alts": List[str], | |
"api_key_via_url": bool, | |
"last_access": int, | |
}, | |
) | |
Order = TypedDict( | |
"Order", | |
{ | |
"id": int, | |
"certificate": CertificateBasic, | |
"status": str, | |
"known_revoked": bool, | |
"error": None, # TODO | |
"dns_identifiers": List[str], | |
"finalized_key": PrivateKeyBasic, | |
"valid_from": int, | |
"valid_to": int, | |
"chain_root_cn": str, | |
"created_at": int, | |
"updated_at": int, | |
}, | |
) | |
class Certwarden: | |
server: str | |
access_token: Union[str, None] | |
def __init__(self, server: str, username: str, password: str): | |
self.server = server | |
self.access_token = None | |
"""Example request | |
{"username":"admin","password":"something"} | |
""" | |
response = self._request( | |
"POST", | |
"/certwarden/api/v1/app/auth/login", | |
{ | |
"username": username, | |
"password": password, | |
}, | |
) | |
authorization = cast("Authorization", response["authorization"]) | |
self.access_token = authorization["access_token"] | |
def _request( | |
self, method: str, url: str, data_json: Union[Dict[str, Any], None] = None | |
) -> Any: | |
data = json.dumps(data_json).encode() if data_json is not None else None | |
headers: dict[str, str] = {} | |
if self.access_token: | |
headers["Authorization"] = self.access_token | |
req = Request(self.server + url, data, headers=headers, method=method) | |
response = urlopen(req) | |
if str(response.status)[0] != "2": | |
raise ValueError( | |
f"Got response code {response.status} for request to URL: {url}" | |
) | |
return json.loads(response.read().decode()) | |
def get_cert(self, name: str): | |
""" | |
Find certificate by name | |
Returns: certificate object | |
""" | |
response = self._request("GET", "/certwarden/api/v1/certificates?limit=1000") | |
certificates = cast("List[CertificateBasic]", response["certificates"]) | |
assert response["total_records"] == len(certificates) | |
for maybe_cert in certificates: | |
if maybe_cert["name"] == name: | |
cert_id = maybe_cert["id"] | |
break | |
else: | |
return None | |
# get detailed certificate data using the id | |
response = self._request("GET", f"/certwarden/api/v1/certificates/{cert_id}") | |
return cast("CertificateFull", response["certificate"]) | |
def get_account_id(self, account_name: str) -> Union[int, None]: | |
""" | |
Find account id by name | |
Returns: account id, or None if account was not found | |
""" | |
response = self._request("GET", "/certwarden/api/v1/acmeaccounts?limit=1000") | |
accounts = cast("List[AcmeAccount]", response["acme_accounts"]) | |
assert response["total_records"] == len(accounts) | |
for maybe_account in accounts: | |
if maybe_account["name"] == account_name: | |
return maybe_account["id"] | |
return None | |
def create_cert( | |
self, name: str, account_id: int, main_subject: str, alt_subjects: List[str] | |
): | |
response = self._request( | |
"POST", | |
"/certwarden/api/v1/certificates", | |
{ | |
"name": name, | |
"description": "", | |
"private_key_id": -1, # create new private key | |
"algorithm_value": "ecdsap256", | |
"acme_account_id": account_id, | |
"subject": main_subject, | |
"subject_alts": alt_subjects, | |
"post_processing_command": "", | |
"post_processing_environment": [], | |
"post_processing_client_enable": False, | |
"preferred_root_cn": "", | |
"organization": "", | |
"organizational_unit": "", | |
"country": "", | |
"state": "", | |
"city": "", | |
"csr_extra_extensions": [], | |
}, | |
) | |
return cast("CertificateFull", response["certificate"]) | |
def update_cert(self, cert: "CertificateBasic", alt_subjects: List[str]): | |
response = self._request( | |
"PUT", | |
f"/certwarden/api/v1/certificates/{cert['id']}", | |
{ | |
"subject_alts": alt_subjects, | |
}, | |
) | |
return cast("CertificateFull", response["certificate"]) | |
def _get_orders(self, cert: "CertificateBasic"): | |
response = self._request( | |
"GET", f"/certwarden/api/v1/certificates/{cert['id']}/orders?limit=1000" | |
) | |
orders = cast('List[Order]', response["orders"]) | |
assert response["total_records"] == len(orders) | |
return orders | |
def has_valid_order(self, cert: "CertificateBasic") -> bool: | |
for order in self._get_orders(cert): | |
if order["status"] == "valid": | |
return True | |
return False | |
def place_order(self, cert: "CertificateBasic") -> "Order": | |
response = self._request( | |
"POST", f"/certwarden/api/v1/certificates/{cert['id']}/orders", {} | |
) | |
return cast("Order", response["order"]) | |
def check_order(self, order: "Order") -> str: | |
for maybe_order in self._get_orders(order["certificate"]): | |
if maybe_order["id"] == order["id"]: | |
return maybe_order["status"] # invalid, valid, pending | |
raise ValueError("order should exist") | |
def get_private_key(self, cert: "CertificateFull") -> "PrivateKeyFull": | |
private_key_id = cert["private_key"]["id"] | |
response = self._request( | |
"GET", f"/certwarden/api/v1/privatekeys/{private_key_id}" | |
) | |
return response["private_key"] | |
def run_module(): | |
# define available arguments/parameters a user can pass to the module | |
module_args = dict( | |
server=dict(type="str", required=True), | |
username=dict(type="str", required=True), | |
password=dict(type="str", required=True, no_log=True), | |
account=dict(type="str", required=True), | |
subject=dict(type="str", required=True), | |
subject_alts=dict(type="list", required=True), | |
) | |
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) | |
# module parameters | |
server = module.params["server"] # https://certwarden.example.org | |
username = module.params["username"] | |
password = module.params["password"] | |
account_name = module.params["account"] | |
subject = module.params["subject"] | |
name = subject | |
subject_alts = module.params["subject_alts"] | |
# sanity checks on subject_alts | |
if subject in subject_alts: | |
module.fail_json(msg="subject must not be present in subject_alts") | |
if len(set(subject_alts)) != len(subject_alts): | |
module.fail_json("subject_alts contains duplicate values") | |
# set to True when a change is made | |
changed = False | |
cw = Certwarden(server, username, password) | |
# get cert info | |
cert = cw.get_cert(name) | |
if cert is None: | |
if module.check_mode: | |
module.fail_json("Certificate does not exist and cannot be created, Ansible is running in check mode") | |
# cert does not exist yet | |
account_id = cw.get_account_id(account_name) | |
if account_id is None: | |
module.fail_json(msg="Account not found") | |
cert = cw.create_cert(name, account_id, subject, subject_alts) | |
else: | |
# cert does exist, is it up to date? | |
# check account name (cannot be updated) | |
if cert["acme_account"]["name"] != account_name: | |
module.fail_json( | |
msg=f"Cannot change account (from '{cert['acme_account']['name']}' to '{account_name}'). Manually remove the certificate AND private key in Certwarden.", | |
changed=changed, | |
) | |
# check subject (cannot be updated) | |
if cert["subject"] != subject: | |
module.fail_json( | |
msg=f"Cannot change subject (from '{cert['subject']}' to '{subject}'), only subject alts. Manually remove the certificate AND private key in Certwarden.", | |
changed=changed, | |
) | |
# check subject_alts, update if necessary | |
if cert["subject_alts"] != subject_alts: | |
# update certificate | |
if module.check_mode: | |
module.warn("Certificate needs to be updated, but Ansible is running in check mode") | |
else: | |
cert = cw.update_cert(cert, subject_alts) | |
changed = True | |
# place an order if: | |
# - subject_alts was updated, in which case changed = True is set above | |
# - there is no currently valid order | |
if changed or not cw.has_valid_order(cert): | |
if module.check_mode: | |
module.warn("Certificate has no valid order and no new order can be created, Ansible is running in check mode. The API keys may return an invalid certificate, or no certificates at all.") | |
else: | |
order = cw.place_order(cert) | |
changed = True | |
# wait for order to succeed | |
for _i in range(300): | |
time.sleep(1) | |
status = cw.check_order(order) | |
if status == "invalid": | |
module.fail_json( | |
msg="Order did not succeed. Check Certwarden web UI for info.", | |
changed=changed, | |
) | |
elif status == "valid": | |
break | |
# for other status values (like "pending"), keep waiting for the order to succeed | |
else: | |
module.fail_json( | |
msg="Waited 5 minutes but still no valid order. Check Certwarden web UI for info.", | |
changed=changed, | |
) | |
# obtain API keys for return data | |
cert_api_key = cert["api_key"] | |
private_key = cw.get_private_key(cert) | |
key_api_key = private_key["api_key"] | |
# exit successfully, returning API keys | |
module.exit_json( | |
changed=changed, cert_api_key=cert_api_key, key_api_key=key_api_key | |
) | |
def main(): | |
run_module() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- name: Create certificate in Certwarden | |
delegate_to: localhost | |
certwarden: | |
server: "{{ certwarden_server }}" | |
account: "{{ certwarden_acme_ccount }}" | |
username: "{{ certwarden_user }}" | |
password: "{{ certwarden_pass }}" | |
subject: "{{ certs_subject }}" | |
subject_alts: "{{ certs_subject_alts }}" | |
register: certwarden | |
- name: Copy checkcerts script | |
ansible.builtin.template: | |
src: checkcerts.j2 | |
dest: /usr/local/bin/checkcerts | |
mode: '500' | |
vars: | |
cert_apikey: "{{ certwarden.cert_api_key }}" | |
cert_key_apikey: "{{ certwarden.key_api_key }}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment