Last active
January 13, 2025 12:11
-
-
Save QNimbus/fbfb385dfc391aff19c4e17e9500a8eb to your computer and use it in GitHub Desktop.
Certificate import #truenas #ssl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
import os | |
import sys | |
import argparse | |
from datetime import datetime | |
import logging | |
from urllib.parse import urlparse | |
import urllib3 | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
# Initialize logging at the very top | |
logging.basicConfig( | |
level=logging.INFO, # Set the logging level to INFO by default | |
format='%(asctime)s - %(levelname)s - %(message)s', # Define the log format | |
datefmt='%Y-%m-%d %H:%M:%S' # Date format for log messages | |
) | |
logger = logging.getLogger(__name__) # Create a logger for the script | |
def debug_http(entity, is_request=True): | |
output = [] | |
logger.debug(f"{'Request' if is_request else 'Response'} details received for debugging") | |
if is_request: | |
output.append("--- HTTP Request ---") | |
parsed_url = urlparse(entity.url) | |
output.extend([ | |
f"URL: {entity.url}", | |
f"Method: {entity.method}", | |
f"Scheme: {parsed_url.scheme}", | |
f"Host: {parsed_url.netloc}", | |
f"Path: {parsed_url.path}", | |
"Headers:", | |
]) | |
else: | |
output.append("--- HTTP Response ---") | |
output.extend([ | |
f"Status Code: {entity.status_code}", | |
f"Reason: {entity.reason}", | |
"Headers:", | |
]) | |
for key, value in entity.headers.items(): | |
if key.lower() == 'authorization': | |
output.append(f" {key}: [REDACTED]") | |
else: | |
output.append(f" {key}: {value}") | |
if is_request and entity.body: | |
try: | |
body = json.loads(entity.body) | |
if isinstance(body, list): | |
for item in body: | |
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']: | |
if field in item: | |
item[field] = '[REDACTED]' | |
elif isinstance(body, dict): | |
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']: | |
if field in body: | |
body[field] = '[REDACTED]' | |
output.append("Body:") | |
output.append(json.dumps(body, indent=2)) | |
except json.JSONDecodeError: | |
output.append("Body: [Binary or non-JSON data]") | |
elif not is_request and entity.text: | |
output.append("Body:") | |
try: | |
body = entity.json() | |
if isinstance(body, list): | |
for item in body: | |
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']: | |
if field in item: | |
item[field] = '[REDACTED]' | |
elif isinstance(body, dict): | |
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']: | |
if field in body: | |
body[field] = '[REDACTED]' | |
output.append(json.dumps(body, indent=2)) | |
except json.JSONDecodeError: | |
output.append(entity.text) | |
elif not is_request: | |
output.append("Body: [Empty]") | |
logger.debug("\n".join(output)) | |
return "\n".join(output) | |
def get_certificates(host, api_key, debug_mode=False): | |
logger.info("Attempting to fetch certificates from TrueNAS API") | |
url = f'http://{host}/api/v2.0/certificate' | |
try: | |
session = requests.Session() | |
headers = { | |
'Authorization': f'Bearer {api_key}' | |
} | |
request = requests.Request('GET', url, headers=headers) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_http(prepared_request, is_request=True)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_http(response, is_request=False)) | |
if response.status_code == 200: | |
logger.info("Successfully retrieved certificates.") | |
# Add debug output for the response content | |
if debug_mode: | |
certs = response.json() | |
logger.debug(f"Retrieved {len(certs)} certificates:") | |
for cert in certs: | |
logger.debug(f" - ID: {cert.get('id')}, Name: {cert.get('name')}") | |
return response.json() | |
else: | |
logger.error(f"Failed to fetch certificates. Status code: {response.status_code}") | |
return None | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error fetching certificates: {e}") | |
return None | |
def list_certificates(host, api_key, debug_mode=False): | |
logger.info("Listing all available certificates.") | |
certs = get_certificates(host, api_key, debug_mode) | |
if not certs: | |
logger.error("No certificates retrieved.") | |
return False | |
print("\nCertificates:") | |
print(f"{'ID':<5} {'Name':<30} {'Common Name':<40} {'Until':<20} {'Type':<15}") | |
print("-" * 110) | |
for cert in certs: | |
cert_id = cert.get('id', 'N/A') | |
name = cert.get('name', 'N/A') | |
common = cert.get('common', 'N/A') | |
until = cert.get('until', 'N/A') | |
cert_type = cert.get('type', 'N/A') | |
print(f"{cert_id:<5} {name[:30]:<30} {common[:40]:<40} {until[:20]:<20} {cert_type:<15}") | |
logger.info(f"Total certificates retrieved: {len(certs)}") | |
return True | |
def validate_cert_name(name): | |
logger.debug(f"Validating certificate name: {name}") | |
import re | |
pattern = r'^[a-zA-Z0-9_-]+$' | |
result = bool(re.match(pattern, name)) | |
logger.info(f"Validation result for certificate name '{name}': {result}") | |
return result | |
def parse_cli_and_env_inputs(): | |
logger.debug("Parsing CLI arguments and environment variables.") | |
parser = argparse.ArgumentParser(description='Import Let\'s Encrypt certificates into TrueNAS Core') | |
parser.add_argument('--api-key', help='TrueNAS API key') | |
parser.add_argument('--host', default='localhost', help='TrueNAS host address (default: localhost)') | |
parser.add_argument('--cert-path', help='Path to certificate directory') | |
parser.add_argument('--name', help='Certificate name (default: letsencrypt_YYYYMMDD)') | |
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug output including full HTTP request/response') | |
parser.add_argument('-l', '--list', action='store_true', help='List all certificates') | |
parser.add_argument('-f', '--force', action='store_true', help='Force overwrite if certificate already exists') | |
parser.add_argument('-s', '--silent', action='store_true', help='Suppress all output, use exit code only') | |
args = parser.parse_args() | |
if not args.list and not args.cert_path: | |
parser.error("--cert-path is required when not using --list") | |
api_key = os.environ.get('API_KEY') | |
if api_key: | |
logger.info("Using API key from environment variable") | |
return api_key, args | |
if args.api_key: | |
logger.info("Using API key from command line argument") | |
return args.api_key, args | |
logger.error("API key must be provided via API_KEY environment variable or --api-key argument") | |
sys.exit(1) | |
def read_certificate_files(cert_path, domain): | |
logger.info(f"Reading certificate files from path: {cert_path}") | |
cert_file = os.path.join(cert_path, "fullchain.cer") | |
key_file = os.path.join(cert_path, f"{domain}.key") | |
try: | |
with open(cert_file, "r") as f: | |
cert = f.read() | |
logger.debug(f"Certificate content from {cert_file} successfully read.") | |
with open(key_file, "r") as f: | |
priv_key = f.read() | |
logger.debug(f"Private key content from {key_file} successfully read.") | |
return cert, priv_key | |
except FileNotFoundError as e: | |
logger.error(f"File not found: {e}") | |
sys.exit(1) | |
except IOError as e: | |
logger.error(f"Error reading files: {e}") | |
sys.exit(1) | |
def set_ui_certificate(host, api_key, cert_id, debug_mode=False): | |
"""Set the system UI certificate for HTTPS and restart the UI""" | |
# First set the certificate | |
url = f'http://{host}/api/v2.0/system/general' | |
payload = { | |
"ui_certificate": cert_id | |
} | |
logger.info(f"Setting UI certificate to ID: {cert_id}") | |
try: | |
session = requests.Session() | |
request = requests.Request( | |
'PUT', | |
url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}, | |
json=payload | |
) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_http(prepared_request, is_request=True)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_http(response, is_request=False)) | |
if response.status_code not in [200, 201]: | |
logger.error(f"Failed to set UI certificate. Status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Error response: {response.text}") | |
return False | |
# If certificate was set successfully, restart the UI | |
logger.info("Successfully set new UI certificate. Restarting UI...") | |
restart_url = f'http://{host}/api/v2.0/system/general/ui_restart' | |
restart_request = requests.Request('GET', restart_url, headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}) | |
prepared_restart_request = restart_request.prepare() | |
if debug_mode: | |
logger.debug(debug_http(prepared_restart_request, is_request=True)) | |
response = session.send(prepared_restart_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_http(response, is_request=False)) | |
if response.status_code == 200: | |
logger.info("UI restart initiated successfully") | |
return True | |
else: | |
logger.error(f"Failed to restart UI. Status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Error response: {response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error during certificate setup or UI restart: {e}") | |
if hasattr(e, 'response') and e.response is not None: | |
logger.error(f"Status code: {e.response.status_code}") | |
if e.response.text: | |
logger.error(f"Error response: {e.response.text}") | |
return False | |
def get_cert_by_name(certs, cert_name): | |
"""Find a certificate by name with detailed logging""" | |
logger.debug(f"Looking for certificate with name: {cert_name}") | |
logger.debug("Available certificates:") | |
for cert in certs: | |
logger.debug(f" - {cert.get('id')}: {cert.get('name')}") | |
matches = [cert for cert in certs if cert['name'] == cert_name] | |
if matches: | |
logger.debug(f"Found matching certificate: {matches[0]['id']}") | |
return matches[0] | |
return None | |
def find_certificate_with_retry(host, api_key, cert_name, debug_mode=False, max_retries=5, delay=2): | |
"""Find certificate in list with retries""" | |
# Add initial delay to allow the certificate to be fully registered | |
logger.debug(f"Waiting {delay} seconds for certificate to be registered...") | |
time.sleep(delay) | |
for attempt in range(max_retries): | |
if attempt > 0: | |
logger.debug(f"Waiting {delay} seconds before retry {attempt + 1}/{max_retries}") | |
time.sleep(delay) | |
logger.debug(f"Attempting to find certificate (attempt {attempt + 1}/{max_retries})") | |
updated_certs = get_certificates(host, api_key, debug_mode) | |
if updated_certs: | |
cert_details = next((cert for cert in updated_certs if cert['name'] == cert_name), None) | |
if cert_details: | |
# Add additional delay after finding the certificate | |
logger.debug("Certificate found, waiting 2 seconds before proceeding...") | |
time.sleep(2) | |
return cert_details | |
return None | |
def check_existing_certificate(host, api_key, cert_name, debug_mode=False): | |
""" | |
Check if a certificate with the same name already exists. | |
Returns (exists, cert_id): | |
exists: bool - True if a certificate with the same name exists | |
cert_id: int or None - ID of existing certificate if found | |
""" | |
certs = get_certificates(host, api_key, debug_mode) | |
if not certs: | |
return False, None | |
existing_cert = next((cert for cert in certs if cert['name'] == cert_name), None) | |
if not existing_cert: | |
return False, None | |
return True, existing_cert['id'] | |
def import_certificate(host, api_key, cert_name, cert, priv_key, debug_mode=False, force=False): | |
"""Import certificate into TrueNAS API""" | |
url = f'http://{host}/api/v2.0/certificate' | |
# First check if certificate exists | |
exists, existing_id = check_existing_certificate( | |
host, api_key, cert_name, debug_mode | |
) | |
if exists: | |
if not force: | |
logger.error(f"Certificate '{cert_name}' already exists. Use --force to overwrite") | |
return False | |
else: | |
logger.info(f"Force flag set, overwriting existing certificate '{cert_name}'") | |
# Delete existing certificate | |
delete_url = f'http://{host}/api/v2.0/certificate/id/{existing_id}' | |
try: | |
session = requests.Session() | |
# Add delete payload | |
delete_request = requests.Request( | |
'DELETE', | |
delete_url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'} | |
) | |
prepared_request = delete_request.prepare() | |
if debug_mode: | |
logger.debug(debug_http(prepared_request, is_request=True)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_http(response, is_request=False)) | |
if response.status_code not in [200, 204]: | |
logger.error(f"Failed to delete existing certificate. Status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Error response: {response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error deleting existing certificate: {e}") | |
return False | |
# Continue with import | |
payload = { | |
"create_type": "CERTIFICATE_CREATE_IMPORTED", | |
"name": cert_name, | |
"certificate": cert, | |
"privatekey": priv_key, | |
} | |
logger.info(f"Importing certificate as '{cert_name}'") | |
try: | |
session = requests.Session() | |
request = requests.Request( | |
'POST', | |
url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}, | |
json=payload | |
) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_http(prepared_request, is_request=True)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_http(response, is_request=False)) | |
if response.status_code == 200: | |
logger.info(f"Certificate '{cert_name}' imported successfully") | |
# Find the certificate with retries | |
cert_details = find_certificate_with_retry(host, api_key, cert_name, debug_mode) | |
if cert_details: | |
cert_id = cert_details['id'] | |
logger.info(f"Found certificate with ID: {cert_id}") | |
logger.info("Certificate details:") | |
logger.info(f" ID: {cert_id}") | |
logger.info(f" Common Name: {cert_details.get('common', 'N/A')}") | |
logger.info(f" Valid Until: {cert_details.get('until', 'N/A')}") | |
logger.info(f" Key Type: {cert_details.get('key_type', 'N/A')}") | |
logger.info(f" Key Length: {cert_details.get('key_length', 'N/A')}") | |
# Set as UI certificate | |
if set_ui_certificate(host, api_key, cert_id, debug_mode): | |
logger.info("Certificate successfully set as system UI certificate") | |
return True | |
else: | |
logger.error("Failed to set certificate as UI certificate") | |
return False | |
else: | |
logger.error(f"Could not find imported certificate with name: {cert_name}") | |
return False | |
elif response.status_code == 401: | |
logger.error("Authentication failed. Please check your credentials.") | |
return False | |
else: | |
logger.error(f"Unexpected status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Response content: {response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error importing certificate: {e}") | |
if hasattr(e, 'response') and e.response is not None: | |
logger.error(f"Status code: {e.response.status_code}") | |
if e.response.text: | |
logger.error(f"Error response: {e.response.text}") | |
return False | |
if __name__ == "__main__": | |
logger.debug("Script execution started.") | |
api_key, args = parse_cli_and_env_inputs() | |
if args.silent: | |
logging.getLogger().setLevel(logging.CRITICAL + 1) | |
logger.debug("Silent mode activated, suppressing logs.") | |
elif args.debug: | |
logger.setLevel(logging.DEBUG) | |
logger.debug("Debug mode enabled.") | |
if args.list: | |
logger.info("List mode activated.") | |
success = list_certificates(args.host, api_key, args.debug) | |
if not success: | |
logger.error("Failed to list certificates.") | |
sys.exit(1) | |
else: | |
domain = os.path.basename(args.cert_path).replace('_ecc', '') | |
logger.info(f"Derived domain from cert-path: {domain}") | |
cert_name = args.name if args.name else "letsencrypt_" + datetime.now().strftime('%Y%m%d') | |
logger.info(f"Using certificate name: {cert_name}") | |
if not validate_cert_name(cert_name): | |
logger.error(f"Invalid certificate name '{cert_name}'") | |
sys.exit(1) | |
cert, priv_key = read_certificate_files(args.cert_path, domain) | |
logger.info("Successfully read certificate and private key files.") | |
success = import_certificate( | |
args.host, | |
api_key, | |
cert_name, | |
cert, | |
priv_key, | |
args.debug, | |
args.force | |
) | |
if not success: | |
logger.error("Failed to import certificate.") | |
sys.exit(1) | |
logger.debug("Script execution completed.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Set this to your AcmeScript dir | |
ACMEDIR=/mnt/vault/acme-script | |
mkdir -p ${ACMEDIR}/.local/bin | |
# Get porkbun-api | |
curl -#o ${ACMEDIR}/.local/bin/porkbun-api https://raw.githubusercontent.com/corey-braun/porkbun-api-bash/refs/heads/main/porkbun-api | |
chmod 755 ${ACMEDIR}/.local/bin/porkbun-api | |
# Get sample .porkbun-apirc config file | |
curl -#o ${ACMEDIR}/.porkbun-apirc https://raw.githubusercontent.com/corey-braun/porkbun-api-bash/refs/heads/main/.porkbun-apirc | |
chmod 600 ${ACMEDIR}/.porkbun-apirc | |
# Get Porkbun DNS-01 helper script from my Gist | |
curl -#o ${ACMEDIR}/porkbun-dns-01.sh https://gist.githubusercontent.com/QNimbus/fbfb385dfc391aff19c4e17e9500a8eb/raw/porkbun-dns-01.sh | |
# set ownership properly | |
sudo chown -R truenas_admin:ssl-cert ${ACMEDIR} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# porkbun-dns-01 - TrueNAS Porkbun DNS-01 ACME helper script | |
# | |
# TrueNAS will call this script with arguments: | |
# [set|unset] FQDN ACME_FQDN TXTVALUE | |
# where: | |
# FQDN is the dns name of cert being requested. e.g. truenas.example.com | |
# ACME_FQDN is the DNS-01 record. e.g. _acme-challenge.truenas.example.com | |
# TXTVALUE is the DNS-01 challenge value | |
set -eou pipefail | |
## Config starts here | |
# set the path to your acmeScript directory | |
export HOME=/mnt/vault/acme-script | |
# set your domain name | |
MYDOMAIN=vwn.io | |
## Everything below here shouldn't need to change | |
# these should be fine | |
export PATH=${HOME}/.local/bin:${PATH} | |
export PBAPI=${HOME}/.local/bin/porkbun-api | |
### FUNCTIONS | |
add_record(){ | |
FQDN=$1 | |
ACME_FQDN="${2}" | |
TXTVALUE=$3 | |
# We need to strip your domain off the end of the ACME_FQDN for porkbun-api | |
ACMESUBDOMAIN=$(echo ${ACME_FQDN} | sed -e "s/.${MYDOMAIN}$//") | |
echo "INFO: Creating TXT record ${ACMESUBDOMAIN} with value ${TXTVALUE}" | logger -t porkbun-dns-01 | |
${PBAPI} custom dns/create/${MYDOMAIN} \ | |
name="${ACMESUBDOMAIN}" \ | |
content="${TXTVALUE}" \ | |
type=TXT \ | |
ttl=60 | logger -t porkbun-dns-01 | |
} | |
del_record(){ | |
FQDN=$1 | |
ACME_FQDN="${2}" | |
TXTVALUE=$3 | |
# We need to strip your domain off the end of the ACME_FQDN for porkbun-api | |
ACMESUBDOMAIN=$(echo ${ACME_FQDN} | sed -e "s/.${MYDOMAIN}$//") | |
echo "INFO: Deleting TXT record ${ACMESUBDOMAIN}" | logger -t porkbun-dns-01 | |
${PBAPI} custom dns/deleteByNameType/${MYDOMAIN}/TXT/${ACMESUBDOMAIN} | logger -t porkbun-dns-01 | |
} | |
### TEST FUNCTION | |
test_script(){ | |
RANDOM_SUBDOMAIN="test-$(openssl rand -hex 4)" | |
TXTVALUE="test-value-$(openssl rand -hex 4)" | |
ACME_FQDN="${RANDOM_SUBDOMAIN}.${MYDOMAIN}" | |
echo "INFO: Testing script by creating a TXT record." | logger -t porkbun-dns-01 | |
add_record "${MYDOMAIN}" "${ACME_FQDN}" "${TXTVALUE}" | |
echo "Test TXT record created:" | |
echo " Name: ${ACME_FQDN}" | |
echo " Value: ${TXTVALUE}" | |
echo "INFO: Deleting test TXT record." | logger -t porkbun-dns-01 | |
del_record "${MYDOMAIN}" "${ACME_FQDN}" "${TXTVALUE}" | |
echo "Test TXT record deleted." | |
} | |
### Main | |
if [ "$1" == "set" ]; then | |
add_record "$2" "$3" "$4" | |
fi | |
if [ "$1" == "unset" ]; then | |
del_record "$2" "$3" "$4" | |
fi | |
if [ "$1" == "test" ]; then | |
test_script | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Import Let's Encrypt (ACME) certificates in TrueNAS Core
Howto
Download acme.sh:
curl https://get.acme.sh | sh -s email=<your email>
Run acme.sh: