Skip to content

Instantly share code, notes, and snippets.

@QNimbus
Last active January 13, 2025 12:11
Show Gist options
  • Save QNimbus/fbfb385dfc391aff19c4e17e9500a8eb to your computer and use it in GitHub Desktop.
Save QNimbus/fbfb385dfc391aff19c4e17e9500a8eb to your computer and use it in GitHub Desktop.
Certificate import #truenas #ssl
import requests
import json
import time
import os
import sys
import argparse
from datetime import datetime
import logging
from urllib.parse import urlparse
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Initialize logging at the very top
logging.basicConfig(
level=logging.INFO, # Set the logging level to INFO by default
format='%(asctime)s - %(levelname)s - %(message)s', # Define the log format
datefmt='%Y-%m-%d %H:%M:%S' # Date format for log messages
)
logger = logging.getLogger(__name__) # Create a logger for the script
def debug_http(entity, is_request=True):
output = []
logger.debug(f"{'Request' if is_request else 'Response'} details received for debugging")
if is_request:
output.append("--- HTTP Request ---")
parsed_url = urlparse(entity.url)
output.extend([
f"URL: {entity.url}",
f"Method: {entity.method}",
f"Scheme: {parsed_url.scheme}",
f"Host: {parsed_url.netloc}",
f"Path: {parsed_url.path}",
"Headers:",
])
else:
output.append("--- HTTP Response ---")
output.extend([
f"Status Code: {entity.status_code}",
f"Reason: {entity.reason}",
"Headers:",
])
for key, value in entity.headers.items():
if key.lower() == 'authorization':
output.append(f" {key}: [REDACTED]")
else:
output.append(f" {key}: {value}")
if is_request and entity.body:
try:
body = json.loads(entity.body)
if isinstance(body, list):
for item in body:
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']:
if field in item:
item[field] = '[REDACTED]'
elif isinstance(body, dict):
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']:
if field in body:
body[field] = '[REDACTED]'
output.append("Body:")
output.append(json.dumps(body, indent=2))
except json.JSONDecodeError:
output.append("Body: [Binary or non-JSON data]")
elif not is_request and entity.text:
output.append("Body:")
try:
body = entity.json()
if isinstance(body, list):
for item in body:
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']:
if field in item:
item[field] = '[REDACTED]'
elif isinstance(body, dict):
for field in ['privatekey', 'certificate', 'password', 'chain_list', 'ui_certificate']:
if field in body:
body[field] = '[REDACTED]'
output.append(json.dumps(body, indent=2))
except json.JSONDecodeError:
output.append(entity.text)
elif not is_request:
output.append("Body: [Empty]")
logger.debug("\n".join(output))
return "\n".join(output)
def get_certificates(host, api_key, debug_mode=False):
logger.info("Attempting to fetch certificates from TrueNAS API")
url = f'http://{host}/api/v2.0/certificate'
try:
session = requests.Session()
headers = {
'Authorization': f'Bearer {api_key}'
}
request = requests.Request('GET', url, headers=headers)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_http(prepared_request, is_request=True))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_http(response, is_request=False))
if response.status_code == 200:
logger.info("Successfully retrieved certificates.")
# Add debug output for the response content
if debug_mode:
certs = response.json()
logger.debug(f"Retrieved {len(certs)} certificates:")
for cert in certs:
logger.debug(f" - ID: {cert.get('id')}, Name: {cert.get('name')}")
return response.json()
else:
logger.error(f"Failed to fetch certificates. Status code: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching certificates: {e}")
return None
def list_certificates(host, api_key, debug_mode=False):
logger.info("Listing all available certificates.")
certs = get_certificates(host, api_key, debug_mode)
if not certs:
logger.error("No certificates retrieved.")
return False
print("\nCertificates:")
print(f"{'ID':<5} {'Name':<30} {'Common Name':<40} {'Until':<20} {'Type':<15}")
print("-" * 110)
for cert in certs:
cert_id = cert.get('id', 'N/A')
name = cert.get('name', 'N/A')
common = cert.get('common', 'N/A')
until = cert.get('until', 'N/A')
cert_type = cert.get('type', 'N/A')
print(f"{cert_id:<5} {name[:30]:<30} {common[:40]:<40} {until[:20]:<20} {cert_type:<15}")
logger.info(f"Total certificates retrieved: {len(certs)}")
return True
def validate_cert_name(name):
logger.debug(f"Validating certificate name: {name}")
import re
pattern = r'^[a-zA-Z0-9_-]+$'
result = bool(re.match(pattern, name))
logger.info(f"Validation result for certificate name '{name}': {result}")
return result
def parse_cli_and_env_inputs():
logger.debug("Parsing CLI arguments and environment variables.")
parser = argparse.ArgumentParser(description='Import Let\'s Encrypt certificates into TrueNAS Core')
parser.add_argument('--api-key', help='TrueNAS API key')
parser.add_argument('--host', default='localhost', help='TrueNAS host address (default: localhost)')
parser.add_argument('--cert-path', help='Path to certificate directory')
parser.add_argument('--name', help='Certificate name (default: letsencrypt_YYYYMMDD)')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug output including full HTTP request/response')
parser.add_argument('-l', '--list', action='store_true', help='List all certificates')
parser.add_argument('-f', '--force', action='store_true', help='Force overwrite if certificate already exists')
parser.add_argument('-s', '--silent', action='store_true', help='Suppress all output, use exit code only')
args = parser.parse_args()
if not args.list and not args.cert_path:
parser.error("--cert-path is required when not using --list")
api_key = os.environ.get('API_KEY')
if api_key:
logger.info("Using API key from environment variable")
return api_key, args
if args.api_key:
logger.info("Using API key from command line argument")
return args.api_key, args
logger.error("API key must be provided via API_KEY environment variable or --api-key argument")
sys.exit(1)
def read_certificate_files(cert_path, domain):
logger.info(f"Reading certificate files from path: {cert_path}")
cert_file = os.path.join(cert_path, "fullchain.cer")
key_file = os.path.join(cert_path, f"{domain}.key")
try:
with open(cert_file, "r") as f:
cert = f.read()
logger.debug(f"Certificate content from {cert_file} successfully read.")
with open(key_file, "r") as f:
priv_key = f.read()
logger.debug(f"Private key content from {key_file} successfully read.")
return cert, priv_key
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
sys.exit(1)
except IOError as e:
logger.error(f"Error reading files: {e}")
sys.exit(1)
def set_ui_certificate(host, api_key, cert_id, debug_mode=False):
"""Set the system UI certificate for HTTPS and restart the UI"""
# First set the certificate
url = f'http://{host}/api/v2.0/system/general'
payload = {
"ui_certificate": cert_id
}
logger.info(f"Setting UI certificate to ID: {cert_id}")
try:
session = requests.Session()
request = requests.Request(
'PUT',
url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'},
json=payload
)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_http(prepared_request, is_request=True))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_http(response, is_request=False))
if response.status_code not in [200, 201]:
logger.error(f"Failed to set UI certificate. Status code: {response.status_code}")
if response.text:
logger.error(f"Error response: {response.text}")
return False
# If certificate was set successfully, restart the UI
logger.info("Successfully set new UI certificate. Restarting UI...")
restart_url = f'http://{host}/api/v2.0/system/general/ui_restart'
restart_request = requests.Request('GET', restart_url, headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'})
prepared_restart_request = restart_request.prepare()
if debug_mode:
logger.debug(debug_http(prepared_restart_request, is_request=True))
response = session.send(prepared_restart_request, verify=False)
if debug_mode:
logger.debug(debug_http(response, is_request=False))
if response.status_code == 200:
logger.info("UI restart initiated successfully")
return True
else:
logger.error(f"Failed to restart UI. Status code: {response.status_code}")
if response.text:
logger.error(f"Error response: {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error during certificate setup or UI restart: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Status code: {e.response.status_code}")
if e.response.text:
logger.error(f"Error response: {e.response.text}")
return False
def get_cert_by_name(certs, cert_name):
"""Find a certificate by name with detailed logging"""
logger.debug(f"Looking for certificate with name: {cert_name}")
logger.debug("Available certificates:")
for cert in certs:
logger.debug(f" - {cert.get('id')}: {cert.get('name')}")
matches = [cert for cert in certs if cert['name'] == cert_name]
if matches:
logger.debug(f"Found matching certificate: {matches[0]['id']}")
return matches[0]
return None
def find_certificate_with_retry(host, api_key, cert_name, debug_mode=False, max_retries=5, delay=2):
"""Find certificate in list with retries"""
# Add initial delay to allow the certificate to be fully registered
logger.debug(f"Waiting {delay} seconds for certificate to be registered...")
time.sleep(delay)
for attempt in range(max_retries):
if attempt > 0:
logger.debug(f"Waiting {delay} seconds before retry {attempt + 1}/{max_retries}")
time.sleep(delay)
logger.debug(f"Attempting to find certificate (attempt {attempt + 1}/{max_retries})")
updated_certs = get_certificates(host, api_key, debug_mode)
if updated_certs:
cert_details = next((cert for cert in updated_certs if cert['name'] == cert_name), None)
if cert_details:
# Add additional delay after finding the certificate
logger.debug("Certificate found, waiting 2 seconds before proceeding...")
time.sleep(2)
return cert_details
return None
def check_existing_certificate(host, api_key, cert_name, debug_mode=False):
"""
Check if a certificate with the same name already exists.
Returns (exists, cert_id):
exists: bool - True if a certificate with the same name exists
cert_id: int or None - ID of existing certificate if found
"""
certs = get_certificates(host, api_key, debug_mode)
if not certs:
return False, None
existing_cert = next((cert for cert in certs if cert['name'] == cert_name), None)
if not existing_cert:
return False, None
return True, existing_cert['id']
def import_certificate(host, api_key, cert_name, cert, priv_key, debug_mode=False, force=False):
"""Import certificate into TrueNAS API"""
url = f'http://{host}/api/v2.0/certificate'
# First check if certificate exists
exists, existing_id = check_existing_certificate(
host, api_key, cert_name, debug_mode
)
if exists:
if not force:
logger.error(f"Certificate '{cert_name}' already exists. Use --force to overwrite")
return False
else:
logger.info(f"Force flag set, overwriting existing certificate '{cert_name}'")
# Delete existing certificate
delete_url = f'http://{host}/api/v2.0/certificate/id/{existing_id}'
try:
session = requests.Session()
# Add delete payload
delete_request = requests.Request(
'DELETE',
delete_url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}
)
prepared_request = delete_request.prepare()
if debug_mode:
logger.debug(debug_http(prepared_request, is_request=True))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_http(response, is_request=False))
if response.status_code not in [200, 204]:
logger.error(f"Failed to delete existing certificate. Status code: {response.status_code}")
if response.text:
logger.error(f"Error response: {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error deleting existing certificate: {e}")
return False
# Continue with import
payload = {
"create_type": "CERTIFICATE_CREATE_IMPORTED",
"name": cert_name,
"certificate": cert,
"privatekey": priv_key,
}
logger.info(f"Importing certificate as '{cert_name}'")
try:
session = requests.Session()
request = requests.Request(
'POST',
url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'},
json=payload
)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_http(prepared_request, is_request=True))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_http(response, is_request=False))
if response.status_code == 200:
logger.info(f"Certificate '{cert_name}' imported successfully")
# Find the certificate with retries
cert_details = find_certificate_with_retry(host, api_key, cert_name, debug_mode)
if cert_details:
cert_id = cert_details['id']
logger.info(f"Found certificate with ID: {cert_id}")
logger.info("Certificate details:")
logger.info(f" ID: {cert_id}")
logger.info(f" Common Name: {cert_details.get('common', 'N/A')}")
logger.info(f" Valid Until: {cert_details.get('until', 'N/A')}")
logger.info(f" Key Type: {cert_details.get('key_type', 'N/A')}")
logger.info(f" Key Length: {cert_details.get('key_length', 'N/A')}")
# Set as UI certificate
if set_ui_certificate(host, api_key, cert_id, debug_mode):
logger.info("Certificate successfully set as system UI certificate")
return True
else:
logger.error("Failed to set certificate as UI certificate")
return False
else:
logger.error(f"Could not find imported certificate with name: {cert_name}")
return False
elif response.status_code == 401:
logger.error("Authentication failed. Please check your credentials.")
return False
else:
logger.error(f"Unexpected status code: {response.status_code}")
if response.text:
logger.error(f"Response content: {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error importing certificate: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Status code: {e.response.status_code}")
if e.response.text:
logger.error(f"Error response: {e.response.text}")
return False
if __name__ == "__main__":
logger.debug("Script execution started.")
api_key, args = parse_cli_and_env_inputs()
if args.silent:
logging.getLogger().setLevel(logging.CRITICAL + 1)
logger.debug("Silent mode activated, suppressing logs.")
elif args.debug:
logger.setLevel(logging.DEBUG)
logger.debug("Debug mode enabled.")
if args.list:
logger.info("List mode activated.")
success = list_certificates(args.host, api_key, args.debug)
if not success:
logger.error("Failed to list certificates.")
sys.exit(1)
else:
domain = os.path.basename(args.cert_path).replace('_ecc', '')
logger.info(f"Derived domain from cert-path: {domain}")
cert_name = args.name if args.name else "letsencrypt_" + datetime.now().strftime('%Y%m%d')
logger.info(f"Using certificate name: {cert_name}")
if not validate_cert_name(cert_name):
logger.error(f"Invalid certificate name '{cert_name}'")
sys.exit(1)
cert, priv_key = read_certificate_files(args.cert_path, domain)
logger.info("Successfully read certificate and private key files.")
success = import_certificate(
args.host,
api_key,
cert_name,
cert,
priv_key,
args.debug,
args.force
)
if not success:
logger.error("Failed to import certificate.")
sys.exit(1)
logger.debug("Script execution completed.")
# Set this to your AcmeScript dir
ACMEDIR=/mnt/vault/acme-script
mkdir -p ${ACMEDIR}/.local/bin
# Get porkbun-api
curl -#o ${ACMEDIR}/.local/bin/porkbun-api https://raw.githubusercontent.com/corey-braun/porkbun-api-bash/refs/heads/main/porkbun-api
chmod 755 ${ACMEDIR}/.local/bin/porkbun-api
# Get sample .porkbun-apirc config file
curl -#o ${ACMEDIR}/.porkbun-apirc https://raw.githubusercontent.com/corey-braun/porkbun-api-bash/refs/heads/main/.porkbun-apirc
chmod 600 ${ACMEDIR}/.porkbun-apirc
# Get Porkbun DNS-01 helper script from my Gist
curl -#o ${ACMEDIR}/porkbun-dns-01.sh https://gist.githubusercontent.com/QNimbus/fbfb385dfc391aff19c4e17e9500a8eb/raw/porkbun-dns-01.sh
# set ownership properly
sudo chown -R truenas_admin:ssl-cert ${ACMEDIR}
#!/bin/bash
# porkbun-dns-01 - TrueNAS Porkbun DNS-01 ACME helper script
#
# TrueNAS will call this script with arguments:
# [set|unset] FQDN ACME_FQDN TXTVALUE
# where:
# FQDN is the dns name of cert being requested. e.g. truenas.example.com
# ACME_FQDN is the DNS-01 record. e.g. _acme-challenge.truenas.example.com
# TXTVALUE is the DNS-01 challenge value
set -eou pipefail
## Config starts here
# set the path to your acmeScript directory
export HOME=/mnt/vault/acme-script
# set your domain name
MYDOMAIN=vwn.io
## Everything below here shouldn't need to change
# these should be fine
export PATH=${HOME}/.local/bin:${PATH}
export PBAPI=${HOME}/.local/bin/porkbun-api
### FUNCTIONS
add_record(){
FQDN=$1
ACME_FQDN="${2}"
TXTVALUE=$3
# We need to strip your domain off the end of the ACME_FQDN for porkbun-api
ACMESUBDOMAIN=$(echo ${ACME_FQDN} | sed -e "s/.${MYDOMAIN}$//")
echo "INFO: Creating TXT record ${ACMESUBDOMAIN} with value ${TXTVALUE}" | logger -t porkbun-dns-01
${PBAPI} custom dns/create/${MYDOMAIN} \
name="${ACMESUBDOMAIN}" \
content="${TXTVALUE}" \
type=TXT \
ttl=60 | logger -t porkbun-dns-01
}
del_record(){
FQDN=$1
ACME_FQDN="${2}"
TXTVALUE=$3
# We need to strip your domain off the end of the ACME_FQDN for porkbun-api
ACMESUBDOMAIN=$(echo ${ACME_FQDN} | sed -e "s/.${MYDOMAIN}$//")
echo "INFO: Deleting TXT record ${ACMESUBDOMAIN}" | logger -t porkbun-dns-01
${PBAPI} custom dns/deleteByNameType/${MYDOMAIN}/TXT/${ACMESUBDOMAIN} | logger -t porkbun-dns-01
}
### TEST FUNCTION
test_script(){
RANDOM_SUBDOMAIN="test-$(openssl rand -hex 4)"
TXTVALUE="test-value-$(openssl rand -hex 4)"
ACME_FQDN="${RANDOM_SUBDOMAIN}.${MYDOMAIN}"
echo "INFO: Testing script by creating a TXT record." | logger -t porkbun-dns-01
add_record "${MYDOMAIN}" "${ACME_FQDN}" "${TXTVALUE}"
echo "Test TXT record created:"
echo " Name: ${ACME_FQDN}"
echo " Value: ${TXTVALUE}"
echo "INFO: Deleting test TXT record." | logger -t porkbun-dns-01
del_record "${MYDOMAIN}" "${ACME_FQDN}" "${TXTVALUE}"
echo "Test TXT record deleted."
}
### Main
if [ "$1" == "set" ]; then
add_record "$2" "$3" "$4"
fi
if [ "$1" == "unset" ]; then
del_record "$2" "$3" "$4"
fi
if [ "$1" == "test" ]; then
test_script
fi
@QNimbus
Copy link
Author

QNimbus commented Jan 11, 2025

Import Let's Encrypt (ACME) certificates in TrueNAS Core

Howto

  • Download acme.sh:
    curl https://get.acme.sh | sh -s email=<your email>

  • Run acme.sh:

export PORKBUN_API_KEY="..."
export PORKBUN_SECRET_API_KEY="..."
./acme.sh --issue --dns dns_porkbun -d <domain name>
  • To import the certificate into TrueNAS, use the following helper script
curl -L -o /root/certificate_import.py https://gist.githubusercontent.com/QNimbus/fbfb385dfc391aff19c4e17e9500a8eb/raw/certificate_import.py
  • Create API key in TrueNAS
  • Create daily cron job:
/root/.acme.sh/acme.sh --cron
[ "$(stat -f %m /root/.acme.sh/<domain name>/fullchain.cer 2>/dev/null)" -ge "$(date -v-1d +%s)" ] && /usr/local/bin/python /root/certificate_import.py --cert-path /root/.acme.sh/<domain name> --name <certificate_name> --force --silent --api-key <KEY>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment