Skip to content

Instantly share code, notes, and snippets.

@QNimbus
Created January 10, 2025 23:44
Show Gist options
  • Save QNimbus/ff6f6b77982d32a343143427cb873f9c to your computer and use it in GitHub Desktop.
Save QNimbus/ff6f6b77982d32a343143427cb873f9c to your computer and use it in GitHub Desktop.
import requests
import json
import time
import os
import sys
import argparse
from datetime import datetime
import logging
from urllib.parse import urlparse
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Initialize logging at the very top
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def debug_request(request):
"""Format request details for debugging"""
parsed_url = urlparse(request.url)
output = [
"--- HTTP Request ---",
f"URL: {request.url}",
f"Method: {request.method}",
f"Scheme: {parsed_url.scheme}",
f"Host: {parsed_url.netloc}",
f"Path: {parsed_url.path}",
"Headers:",
]
# Add headers
for key, value in request.headers.items():
if key.lower() == 'authorization':
output.append(f" {key}: [REDACTED]")
else:
output.append(f" {key}: {value}")
# Add body if exists
if request.body:
try:
body_dict = json.loads(request.body)
# Redact sensitive information
if 'privatekey' in body_dict:
body_dict['privatekey'] = '[REDACTED]'
if 'certificate' in body_dict:
body_dict['certificate'] = '[REDACTED]'
if 'password' in body_dict:
body_dict['password'] = '[REDACTED]'
output.append("Body:")
output.append(json.dumps(body_dict, indent=2))
except json.JSONDecodeError:
output.append("Body: [Binary or non-JSON data]")
return "\n".join(output)
def debug_response(response):
"""Format response details for debugging"""
output = [
"--- HTTP Response ---",
f"Status Code: {response.status_code}",
f"Reason: {response.reason}",
"Headers:"
]
# Add headers
for key, value in response.headers.items():
output.append(f" {key}: {value}")
# Add body if exists
if response.text:
output.append("Body:")
try:
# Try to parse as JSON
if isinstance(response.json(), list):
# Handle list of certificates
certs = response.json()
redacted_certs = []
for cert in certs:
redacted_cert = cert.copy()
# Redact sensitive fields
sensitive_fields = ['certificate', 'privatekey', 'CSR', 'chain_list']
for field in sensitive_fields:
if field in redacted_cert:
redacted_cert[field] = '[REDACTED]'
redacted_certs.append(redacted_cert)
output.append(json.dumps(redacted_certs, indent=2))
else:
# Handle single object
body_dict = response.json()
# Redact sensitive fields if they exist
if isinstance(body_dict, dict):
sensitive_fields = ['certificate', 'privatekey', 'CSR', 'chain_list']
for field in sensitive_fields:
if field in body_dict:
body_dict[field] = '[REDACTED]'
output.append(json.dumps(body_dict, indent=2))
except json.JSONDecodeError:
# If not JSON, just show the raw text
output.append(response.text)
else:
output.append("Body: [Empty]")
return "\n".join(output)
"""Format response details for debugging"""
output = [
"--- HTTP Response ---",
f"Status Code: {response.status_code}",
f"Reason: {response.reason}",
"Headers:"
]
# Add headers
for key, value in response.headers.items():
output.append(f" {key}: {value}")
# Add body if exists
if response.text:
output.append("Body:")
try:
body_dict = json.loads(response.text)
output.append(json.dumps(body_dict, indent=2))
except json.JSONDecodeError:
output.append(response.text)
else:
output.append("Body: [Empty]")
return "\n".join(output)
def get_certificates(host, api_key, debug_mode=False):
"""Get list of certificates from TrueNAS API"""
url = f'http://{host}/api/v2.0/certificate'
try:
session = requests.Session()
headers = {
'Authorization': f'Bearer {api_key}'
}
request = requests.Request('GET', url, headers=headers)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_request(prepared_request))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_response(response))
if response.status_code == 200:
return response.json()
else:
logger.error(f"Failed to fetch certificates. Status code: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching certificates: {e}")
return None
def list_certificates(host, api_key, debug_mode=False):
"""List all certificates from TrueNAS API"""
logger.info("Fetching certificate list...")
certs = get_certificates(host, api_key, debug_mode)
if not certs:
return False
if not certs:
logger.info("No certificates found")
return True
# Print certificate information in a formatted table
print("\nCertificates:")
print(f"{'ID':<5} {'Name':<30} {'Common Name':<40} {'Until':<20} {'Type':<15}")
print("-" * 110)
for cert in certs:
cert_id = cert.get('id', 'N/A')
name = cert.get('name', 'N/A')
common = cert.get('common', 'N/A')
until = cert.get('until', 'N/A')
cert_type = cert.get('type', 'N/A')
print(f"{cert_id:<5} {name[:30]:<30} {common[:40]:<40} {until[:20]:<20} {cert_type:<15}")
print(f"\nTotal certificates: {len(certs)}")
return True
def set_ui_certificate(host, api_key, cert_id, debug_mode=False):
"""Set the system UI certificate for HTTPS and restart the UI"""
# First set the certificate
url = f'http://{host}/api/v2.0/system/general'
payload = {
"ui_certificate": cert_id
}
logger.info(f"Setting UI certificate to ID: {cert_id}")
try:
session = requests.Session()
request = requests.Request(
'PUT',
url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'},
json=payload
)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_request(prepared_request))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_response(response))
if response.status_code not in [200, 201]:
logger.error(f"Failed to set UI certificate. Status code: {response.status_code}")
if response.text:
logger.error(f"Error response: {response.text}")
return False
# If certificate was set successfully, restart the UI
logger.info("Successfully set new UI certificate. Restarting UI...")
restart_url = f'http://{host}/api/v2.0/system/general/ui_restart'
restart_request = requests.Request('GET', restart_url, headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'})
prepared_restart_request = restart_request.prepare()
if debug_mode:
logger.debug(debug_request(prepared_restart_request))
restart_response = session.send(prepared_restart_request, verify=False)
if debug_mode:
logger.debug(debug_response(restart_response))
if restart_response.status_code == 200:
logger.info("UI restart initiated successfully")
return True
else:
logger.error(f"Failed to restart UI. Status code: {restart_response.status_code}")
if restart_response.text:
logger.error(f"Error response: {restart_response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error during certificate setup or UI restart: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Status code: {e.response.status_code}")
if e.response.text:
logger.error(f"Error response: {e.response.text}")
return False
def get_cert_by_name(certs, cert_name):
"""Find a certificate by name with detailed logging"""
logger.debug(f"Looking for certificate with name: {cert_name}")
logger.debug("Available certificates:")
for cert in certs:
logger.debug(f" - {cert.get('id')}: {cert.get('name')}")
matches = [cert for cert in certs if cert['name'] == cert_name]
if matches:
logger.debug(f"Found matching certificate: {matches[0]['id']}")
return matches[0]
return None
def find_certificate_with_retry(host, api_key, cert_name, debug_mode=False, max_retries=5, delay=2):
"""Find certificate in list with retries"""
# Add initial delay to allow the certificate to be fully registered
logger.debug(f"Waiting {delay} seconds for certificate to be registered...")
time.sleep(delay)
for attempt in range(max_retries):
if attempt > 0:
logger.debug(f"Waiting {delay} seconds before retry {attempt + 1}/{max_retries}")
time.sleep(delay)
logger.debug(f"Attempting to find certificate (attempt {attempt + 1}/{max_retries})")
updated_certs = get_certificates(host, api_key, debug_mode)
if updated_certs:
cert_details = next((cert for cert in updated_certs if cert['name'] == cert_name), None)
if cert_details:
# Add additional delay after finding the certificate
logger.debug("Certificate found, waiting 2 seconds before proceeding...")
time.sleep(2)
return cert_details
return None
def check_existing_certificate(host, api_key, cert_name, debug_mode=False):
"""
Check if a certificate with the same name already exists.
Returns (exists, cert_id):
exists: bool - True if a certificate with the same name exists
cert_id: int or None - ID of existing certificate if found
"""
certs = get_certificates(host, api_key, debug_mode)
if not certs:
return False, None
existing_cert = next((cert for cert in certs if cert['name'] == cert_name), None)
if not existing_cert:
return False, None
return True, existing_cert['id']
def import_certificate(host, api_key, cert_name, cert, priv_key, debug_mode=False, force=False):
"""Import certificate into TrueNAS API"""
url = f'http://{host}/api/v2.0/certificate'
# First check if certificate exists
exists, existing_id = check_existing_certificate(
host, api_key, cert_name, debug_mode
)
if exists:
if not force:
logger.error(f"Certificate '{cert_name}' already exists. Use --force to overwrite")
return False
else:
logger.info(f"Force flag set, overwriting existing certificate '{cert_name}'")
# Delete existing certificate
delete_url = f'http://{host}/api/v2.0/certificate/id/{existing_id}'
try:
session = requests.Session()
# Add delete payload
delete_request = requests.Request(
'DELETE',
delete_url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}
)
prepared_request = delete_request.prepare()
if debug_mode:
logger.debug(debug_request(prepared_request))
delete_response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_response(delete_response))
if delete_response.status_code not in [200, 204]:
logger.error(f"Failed to delete existing certificate. Status code: {delete_response.status_code}")
if delete_response.text:
logger.error(f"Error response: {delete_response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error deleting existing certificate: {e}")
return False
# Continue with import
payload = {
"create_type": "CERTIFICATE_CREATE_IMPORTED",
"name": cert_name,
"certificate": cert,
"privatekey": priv_key,
}
logger.info(f"Importing certificate as '{cert_name}'")
try:
session = requests.Session()
request = requests.Request(
'POST',
url,
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'},
json=payload
)
prepared_request = request.prepare()
if debug_mode:
logger.debug(debug_request(prepared_request))
response = session.send(prepared_request, verify=False)
if debug_mode:
logger.debug(debug_response(response))
if response.status_code == 200:
logger.info(f"Certificate '{cert_name}' imported successfully")
# Find the certificate with retries
cert_details = find_certificate_with_retry(host, api_key, cert_name, debug_mode)
if cert_details:
cert_id = cert_details['id']
logger.info(f"Found certificate with ID: {cert_id}")
logger.info("Certificate details:")
logger.info(f" ID: {cert_id}")
logger.info(f" Common Name: {cert_details.get('common', 'N/A')}")
logger.info(f" Valid Until: {cert_details.get('until', 'N/A')}")
logger.info(f" Key Type: {cert_details.get('key_type', 'N/A')}")
logger.info(f" Key Length: {cert_details.get('key_length', 'N/A')}")
# Set as UI certificate
if set_ui_certificate(host, api_key, cert_id, debug_mode):
logger.info("Certificate successfully set as system UI certificate")
return True
else:
logger.error("Failed to set certificate as UI certificate")
return False
else:
logger.error(f"Could not find imported certificate with name: {cert_name}")
return False
elif response.status_code == 401:
logger.error("Authentication failed. Please check your credentials.")
return False
else:
logger.error(f"Unexpected status code: {response.status_code}")
if response.text:
logger.error(f"Response content: {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Error importing certificate: {e}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Status code: {e.response.status_code}")
if e.response.text:
logger.error(f"Error response: {e.response.text}")
return False
def was_cert_renewed(cert_path, domain, max_age_hours=1):
"""
Check if certificate files were recently modified, indicating a renewal
Args:
cert_path: Path to certificate directory
domain: Domain name for key file
max_age_hours: Maximum age in hours to consider as "recently renewed"
Returns:
bool: True if certificate was renewed recently, False otherwise
"""
cert_file = os.path.join(cert_path, "fullchain.cer")
key_file = os.path.join(cert_path, f"{domain}.key")
try:
# Get the most recent modification time of both files
cert_mtime = os.path.getmtime(cert_file)
key_mtime = os.path.getmtime(key_file)
most_recent = max(cert_mtime, key_mtime)
# Calculate age in hours
age_hours = (time.time() - most_recent) / 3600
logger.debug(f"Certificate files age: {age_hours:.2f} hours")
return age_hours <= max_age_hours
except OSError as e:
logger.error(f"Error checking certificate files: {e}")
sys.exit(1)
def validate_cert_name(name):
"""
Validate certificate name to ensure it only contains allowed characters [a-zA-Z0-9_-]
Returns:
bool: True if valid, False otherwise
"""
import re
pattern = r'^[a-zA-Z0-9_-]+$'
return bool(re.match(pattern, name))
def get_api_key():
"""Get TrueNAS API key from environment variable or command line argument"""
parser = argparse.ArgumentParser(description='Import Let\'s Encrypt certificates into TrueNAS Core')
parser.add_argument('--api-key', help='TrueNAS API key')
parser.add_argument('--host', default='localhost', help='TrueNAS host address (default: localhost)')
parser.add_argument('--cert-path', help='Path to certificate directory')
parser.add_argument('--name', help='Certificate name (default: letsencrypt_YYYYMMDD)')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug output including full HTTP request/response')
parser.add_argument('-l', '--list', action='store_true', help='List all certificates')
parser.add_argument('-f', '--force', action='store_true', help='Force overwrite if certificate already exists')
parser.add_argument('-s', '--silent', action='store_true', help='Suppress all output, use exit code only')
parser.add_argument('--max-age', type=float, default=1.0, help='Maximum age in hours for certificate files to be considered recently renewed (default: 1.0)')
args = parser.parse_args()
# Validate that cert-path is provided when not listing certificates
if not args.list and not args.cert_path:
parser.error("--cert-path is required when not using --list")
# First check environment variable
api_key = os.environ.get('API_KEY')
if api_key:
logger.info("Using API key from environment variable")
return api_key, args
if args.api_key:
logger.info("Using API key from command line argument")
return args.api_key, args
logger.error("API key must be provided via API_KEY environment variable or --api-key argument")
sys.exit(1)
def read_certificate_files(cert_path, domain):
"""Read certificate and private key files"""
logger.info(f"Reading certificate files from {cert_path}")
cert_file = os.path.join(cert_path, "fullchain.cer")
key_file = os.path.join(cert_path, f"{domain}.key")
try:
with open(cert_file, "r") as f:
cert = f.read()
logger.debug(f"Successfully read certificate from {cert_file}")
with open(key_file, "r") as f:
priv_key = f.read()
logger.debug(f"Successfully read private key from {key_file}")
return cert, priv_key
except FileNotFoundError as e:
logger.error(f"Certificate file not found: {e}")
sys.exit(1)
except IOError as e:
logger.error(f"Error reading certificate files: {e}")
sys.exit(1)
if __name__ == "__main__":
# Get configuration
api_key, args = get_api_key()
# Handle silent flag first
if args.silent:
logging.getLogger().setLevel(logging.CRITICAL + 1) # Suppress all logging
elif args.debug:
logger.setLevel(logging.DEBUG)
elif args.verbose:
logger.setLevel(logging.INFO)
if args.list:
# List certificates
success = list_certificates(args.host, api_key, args.debug)
if not success:
sys.exit(1)
else:
# Extract domain from cert path
domain = os.path.basename(args.cert_path).replace('_ecc', '')
# Check if certificates were renewed
if not was_cert_renewed(args.cert_path, domain, args.max_age):
logger.info("Certificate files not recently modified, skipping import")
sys.exit(0)
# Use provided name or generate default name with timestamp
cert_name = args.name if args.name else "letsencrypt_" + datetime.now().strftime('%Y%m%d')
# Validate certificate name
if not validate_cert_name(cert_name):
logger.error(f"Invalid certificate name '{cert_name}'. Name can only contain letters, numbers, underscores, and hyphens [a-zA-Z0-9_-]")
sys.exit(1)
# Read certificate files
cert, priv_key = read_certificate_files(args.cert_path, domain)
# Import certificate
success = import_certificate(
args.host,
api_key,
cert_name,
cert,
priv_key,
args.debug,
args.force
)
if not success:
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment