Skip to content

Instantly share code, notes, and snippets.

@Derkades
Last active April 7, 2025 13:20
Show Gist options
  • Save Derkades/053e7dbb173a156ad1bc2a26b900cead to your computer and use it in GitHub Desktop.
Save Derkades/053e7dbb173a156ad1bc2a26b900cead to your computer and use it in GitHub Desktop.
Certwarden Ansible module
#!/usr/bin/python
# Copyright: (c) 2024, Robin Slot <[email protected]>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
import json
import time
from urllib.request import Request, urlopen
from typing import TYPE_CHECKING, Any, Dict, List, Union, cast
if TYPE_CHECKING:
from typing import TypedDict
__metaclass__ = type
DOCUMENTATION = r"""
---
module: certwarden
short_description: Create certificates in Certwarden
# If this is part of a collection, you need to use semantic versioning,
# i.e. the version is of the form "2.5.0" and not "2.4".
version_added: "1.0.0"
description: Ansible module to create certificates in Certwarden.
options:
server:
description: Server url, e.g. https://certwarden.example.org
required: true
type: str
username:
description: Username
required: true
type: str
password:
description: Password
required: true
type: str
account:
description: ACME account name
required: true
type: str
subject:
description: Subject FQDN.
required: true
type: str
subject_alts:
description:
- Additional subject alt names.
required: true
type: list
# Specify this value according to your collection
# in format of namespace.collection.doc_fragment_name
# extends_documentation_fragment:
# - my_namespace.my_collection.my_doc_fragment_name
author:
- Robin Slot (@Derkades)
"""
EXAMPLES = r"""
# Pass in a message
- name: Create certificate
certwarden:
server: https://certwarden.example.org
username: admin
password: "{{ certwarden_password }}"
account: letsencrypt_staging
subject: 'example.org'
subject_alts: ['example.net'] # optional
"""
# TODO
RETURN = r"""
cert_api_key:
description: API key to obtain certificate
type: str
returned: always
sample: 'xu8nodsUfetpP1D1rQ3H76p3C4n2Zk'
key_api_key:
description: API key to obtain certificate private key
type: str
returned: always
sample: '6aeGXnN33ULhGjewVYpBOu6HBYNBvC'
"""
from ansible.module_utils.basic import AnsibleModule
if TYPE_CHECKING:
Authorization = TypedDict(
"Authorization",
{
"username": str,
"user_type": str,
"access_token": str,
"access_token_exp": int,
"session_exp": int,
},
)
PrivateKeyBasic = TypedDict("PrivateKeyBasic", {"id": int, "name": str})
PrivateKeyFull = TypedDict(
"PrivateKeyFull",
{
"id": int,
"name": str,
"description": str,
"api_key_disabled": bool,
"api_key_via_url": bool,
"last_access": int,
"api_key": str,
"created_at": int,
"updated_at": int,
},
)
AcmeServer = TypedDict("AcmeServer", {"id": int, "name": str, "is_staging": bool})
AcmeAccount = TypedDict(
"AcmeAccount", {"id": int, "name": str, "acme_server": AcmeServer}
)
CertificateFull = TypedDict(
"CertificateFull",
{
"id": int,
"name": str,
"description": str,
"private_key": PrivateKeyBasic,
"acme_account": AcmeAccount,
"subject": str,
"subject_alts": List[str],
"api_key_via_url": bool,
"last_access": int,
"organization": str,
"organizational_unit": str,
"country": str,
"state": str,
"city": str,
"csr_extra_extensions": List[str],
"preferred_root_cn": str,
"created_at": int,
"updated_at": int,
"api_key": str,
"post_processing_command": str,
"post_processing_environment": List[str],
"post_processing_client_key": str,
},
)
CertificateBasic = TypedDict(
"CertificateBasic",
{
"id": int,
"name": str,
"description": str,
"private_key": PrivateKeyBasic,
"acme_account": AcmeAccount,
"subject": str,
"subject_alts": List[str],
"api_key_via_url": bool,
"last_access": int,
},
)
Order = TypedDict(
"Order",
{
"id": int,
"certificate": CertificateBasic,
"status": str,
"known_revoked": bool,
"error": None, # TODO
"dns_identifiers": List[str],
"finalized_key": PrivateKeyBasic,
"valid_from": int,
"valid_to": int,
"chain_root_cn": str,
"created_at": int,
"updated_at": int,
},
)
class Certwarden:
server: str
access_token: Union[str, None]
def __init__(self, server: str, username: str, password: str):
self.server = server
self.access_token = None
"""Example request
{"username":"admin","password":"something"}
"""
response = self._request(
"POST",
"/certwarden/api/v1/app/auth/login",
{
"username": username,
"password": password,
},
)
authorization = cast("Authorization", response["authorization"])
self.access_token = authorization["access_token"]
def _request(
self, method: str, url: str, data_json: Union[Dict[str, Any], None] = None
) -> Any:
data = json.dumps(data_json).encode() if data_json is not None else None
headers: dict[str, str] = {}
if self.access_token:
headers["Authorization"] = self.access_token
req = Request(self.server + url, data, headers=headers, method=method)
response = urlopen(req)
if str(response.status)[0] != "2":
raise ValueError(
f"Got response code {response.status} for request to URL: {url}"
)
return json.loads(response.read().decode())
def get_cert(self, name: str):
"""
Find certificate by name
Returns: certificate object
"""
response = self._request("GET", "/certwarden/api/v1/certificates?limit=1000")
certificates = cast("List[CertificateBasic]", response["certificates"])
assert response["total_records"] == len(certificates)
for maybe_cert in certificates:
if maybe_cert["name"] == name:
cert_id = maybe_cert["id"]
break
else:
return None
# get detailed certificate data using the id
response = self._request("GET", f"/certwarden/api/v1/certificates/{cert_id}")
return cast("CertificateFull", response["certificate"])
def get_account_id(self, account_name: str) -> Union[int, None]:
"""
Find account id by name
Returns: account id, or None if account was not found
"""
response = self._request("GET", "/certwarden/api/v1/acmeaccounts?limit=1000")
accounts = cast("List[AcmeAccount]", response["acme_accounts"])
assert response["total_records"] == len(accounts)
for maybe_account in accounts:
if maybe_account["name"] == account_name:
return maybe_account["id"]
return None
def create_cert(
self, name: str, account_id: int, main_subject: str, alt_subjects: List[str]
):
response = self._request(
"POST",
"/certwarden/api/v1/certificates",
{
"name": name,
"description": "",
"private_key_id": -1, # create new private key
"algorithm_value": "ecdsap256",
"acme_account_id": account_id,
"subject": main_subject,
"subject_alts": alt_subjects,
"post_processing_command": "",
"post_processing_environment": [],
"post_processing_client_enable": False,
"preferred_root_cn": "",
"organization": "",
"organizational_unit": "",
"country": "",
"state": "",
"city": "",
"csr_extra_extensions": [],
},
)
return cast("CertificateFull", response["certificate"])
def update_cert(self, cert: "CertificateBasic", alt_subjects: List[str]):
response = self._request(
"PUT",
f"/certwarden/api/v1/certificates/{cert['id']}",
{
"subject_alts": alt_subjects,
},
)
return cast("CertificateFull", response["certificate"])
def _get_orders(self, cert: "CertificateBasic"):
response = self._request(
"GET", f"/certwarden/api/v1/certificates/{cert['id']}/orders?limit=1000"
)
orders = cast('List[Order]', response["orders"])
assert response["total_records"] == len(orders)
return orders
def has_valid_order(self, cert: "CertificateBasic") -> bool:
for order in self._get_orders(cert):
if order["status"] == "valid":
return True
return False
def place_order(self, cert: "CertificateBasic") -> "Order":
response = self._request(
"POST", f"/certwarden/api/v1/certificates/{cert['id']}/orders", {}
)
return cast("Order", response["order"])
def check_order(self, order: "Order") -> str:
for maybe_order in self._get_orders(order["certificate"]):
if maybe_order["id"] == order["id"]:
return maybe_order["status"] # invalid, valid, pending
raise ValueError("order should exist")
def get_private_key(self, cert: "CertificateFull") -> "PrivateKeyFull":
private_key_id = cert["private_key"]["id"]
response = self._request(
"GET", f"/certwarden/api/v1/privatekeys/{private_key_id}"
)
return response["private_key"]
def run_module():
# define available arguments/parameters a user can pass to the module
module_args = dict(
server=dict(type="str", required=True),
username=dict(type="str", required=True),
password=dict(type="str", required=True, no_log=True),
account=dict(type="str", required=True),
subject=dict(type="str", required=True),
subject_alts=dict(type="list", required=True),
)
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)
# module parameters
server = module.params["server"] # https://certwarden.example.org
username = module.params["username"]
password = module.params["password"]
account_name = module.params["account"]
subject = module.params["subject"]
name = subject
subject_alts = module.params["subject_alts"]
# sanity checks on subject_alts
if subject in subject_alts:
module.fail_json(msg="subject must not be present in subject_alts")
if len(set(subject_alts)) != len(subject_alts):
module.fail_json("subject_alts contains duplicate values")
# set to True when a change is made
changed = False
cw = Certwarden(server, username, password)
# get cert info
cert = cw.get_cert(name)
if cert is None:
if module.check_mode:
module.fail_json("Certificate does not exist and cannot be created, Ansible is running in check mode")
# cert does not exist yet
account_id = cw.get_account_id(account_name)
if account_id is None:
module.fail_json(msg="Account not found")
cert = cw.create_cert(name, account_id, subject, subject_alts)
else:
# cert does exist, is it up to date?
# check account name (cannot be updated)
if cert["acme_account"]["name"] != account_name:
module.fail_json(
msg=f"Cannot change account (from '{cert['acme_account']['name']}' to '{account_name}'). Manually remove the certificate AND private key in Certwarden.",
changed=changed,
)
# check subject (cannot be updated)
if cert["subject"] != subject:
module.fail_json(
msg=f"Cannot change subject (from '{cert['subject']}' to '{subject}'), only subject alts. Manually remove the certificate AND private key in Certwarden.",
changed=changed,
)
# check subject_alts, update if necessary
if cert["subject_alts"] != subject_alts:
# update certificate
if module.check_mode:
module.warn("Certificate needs to be updated, but Ansible is running in check mode")
else:
cert = cw.update_cert(cert, subject_alts)
changed = True
# place an order if:
# - subject_alts was updated, in which case changed = True is set above
# - there is no currently valid order
if changed or not cw.has_valid_order(cert):
if module.check_mode:
module.warn("Certificate has no valid order and no new order can be created, Ansible is running in check mode. The API keys may return an invalid certificate, or no certificates at all.")
else:
order = cw.place_order(cert)
changed = True
# wait for order to succeed
for _i in range(300):
time.sleep(1)
status = cw.check_order(order)
if status == "invalid":
module.fail_json(
msg="Order did not succeed. Check Certwarden web UI for info.",
changed=changed,
)
elif status == "valid":
break
# for other status values (like "pending"), keep waiting for the order to succeed
else:
module.fail_json(
msg="Waited 5 minutes but still no valid order. Check Certwarden web UI for info.",
changed=changed,
)
# obtain API keys for return data
cert_api_key = cert["api_key"]
private_key = cw.get_private_key(cert)
key_api_key = private_key["api_key"]
# exit successfully, returning API keys
module.exit_json(
changed=changed, cert_api_key=cert_api_key, key_api_key=key_api_key
)
def main():
run_module()
if __name__ == "__main__":
main()
- name: Create certificate in Certwarden
delegate_to: localhost
certwarden:
server: "{{ certwarden_server }}"
account: "{{ certwarden_acme_ccount }}"
username: "{{ certwarden_user }}"
password: "{{ certwarden_pass }}"
subject: "{{ certs_subject }}"
subject_alts: "{{ certs_subject_alts }}"
register: certwarden
- name: Copy checkcerts script
ansible.builtin.template:
src: checkcerts.j2
dest: /usr/local/bin/checkcerts
mode: '500'
vars:
cert_apikey: "{{ certwarden.cert_api_key }}"
cert_key_apikey: "{{ certwarden.key_api_key }}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment