Created
January 10, 2025 23:44
-
-
Save QNimbus/ff6f6b77982d32a343143427cb873f9c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
import os | |
import sys | |
import argparse | |
from datetime import datetime | |
import logging | |
from urllib.parse import urlparse | |
import urllib3 | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
# Initialize logging at the very top | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
def debug_request(request): | |
"""Format request details for debugging""" | |
parsed_url = urlparse(request.url) | |
output = [ | |
"--- HTTP Request ---", | |
f"URL: {request.url}", | |
f"Method: {request.method}", | |
f"Scheme: {parsed_url.scheme}", | |
f"Host: {parsed_url.netloc}", | |
f"Path: {parsed_url.path}", | |
"Headers:", | |
] | |
# Add headers | |
for key, value in request.headers.items(): | |
if key.lower() == 'authorization': | |
output.append(f" {key}: [REDACTED]") | |
else: | |
output.append(f" {key}: {value}") | |
# Add body if exists | |
if request.body: | |
try: | |
body_dict = json.loads(request.body) | |
# Redact sensitive information | |
if 'privatekey' in body_dict: | |
body_dict['privatekey'] = '[REDACTED]' | |
if 'certificate' in body_dict: | |
body_dict['certificate'] = '[REDACTED]' | |
if 'password' in body_dict: | |
body_dict['password'] = '[REDACTED]' | |
output.append("Body:") | |
output.append(json.dumps(body_dict, indent=2)) | |
except json.JSONDecodeError: | |
output.append("Body: [Binary or non-JSON data]") | |
return "\n".join(output) | |
def debug_response(response): | |
"""Format response details for debugging""" | |
output = [ | |
"--- HTTP Response ---", | |
f"Status Code: {response.status_code}", | |
f"Reason: {response.reason}", | |
"Headers:" | |
] | |
# Add headers | |
for key, value in response.headers.items(): | |
output.append(f" {key}: {value}") | |
# Add body if exists | |
if response.text: | |
output.append("Body:") | |
try: | |
# Try to parse as JSON | |
if isinstance(response.json(), list): | |
# Handle list of certificates | |
certs = response.json() | |
redacted_certs = [] | |
for cert in certs: | |
redacted_cert = cert.copy() | |
# Redact sensitive fields | |
sensitive_fields = ['certificate', 'privatekey', 'CSR', 'chain_list'] | |
for field in sensitive_fields: | |
if field in redacted_cert: | |
redacted_cert[field] = '[REDACTED]' | |
redacted_certs.append(redacted_cert) | |
output.append(json.dumps(redacted_certs, indent=2)) | |
else: | |
# Handle single object | |
body_dict = response.json() | |
# Redact sensitive fields if they exist | |
if isinstance(body_dict, dict): | |
sensitive_fields = ['certificate', 'privatekey', 'CSR', 'chain_list'] | |
for field in sensitive_fields: | |
if field in body_dict: | |
body_dict[field] = '[REDACTED]' | |
output.append(json.dumps(body_dict, indent=2)) | |
except json.JSONDecodeError: | |
# If not JSON, just show the raw text | |
output.append(response.text) | |
else: | |
output.append("Body: [Empty]") | |
return "\n".join(output) | |
"""Format response details for debugging""" | |
output = [ | |
"--- HTTP Response ---", | |
f"Status Code: {response.status_code}", | |
f"Reason: {response.reason}", | |
"Headers:" | |
] | |
# Add headers | |
for key, value in response.headers.items(): | |
output.append(f" {key}: {value}") | |
# Add body if exists | |
if response.text: | |
output.append("Body:") | |
try: | |
body_dict = json.loads(response.text) | |
output.append(json.dumps(body_dict, indent=2)) | |
except json.JSONDecodeError: | |
output.append(response.text) | |
else: | |
output.append("Body: [Empty]") | |
return "\n".join(output) | |
def get_certificates(host, api_key, debug_mode=False): | |
"""Get list of certificates from TrueNAS API""" | |
url = f'http://{host}/api/v2.0/certificate' | |
try: | |
session = requests.Session() | |
headers = { | |
'Authorization': f'Bearer {api_key}' | |
} | |
request = requests.Request('GET', url, headers=headers) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_request(prepared_request)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_response(response)) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
logger.error(f"Failed to fetch certificates. Status code: {response.status_code}") | |
return None | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error fetching certificates: {e}") | |
return None | |
def list_certificates(host, api_key, debug_mode=False): | |
"""List all certificates from TrueNAS API""" | |
logger.info("Fetching certificate list...") | |
certs = get_certificates(host, api_key, debug_mode) | |
if not certs: | |
return False | |
if not certs: | |
logger.info("No certificates found") | |
return True | |
# Print certificate information in a formatted table | |
print("\nCertificates:") | |
print(f"{'ID':<5} {'Name':<30} {'Common Name':<40} {'Until':<20} {'Type':<15}") | |
print("-" * 110) | |
for cert in certs: | |
cert_id = cert.get('id', 'N/A') | |
name = cert.get('name', 'N/A') | |
common = cert.get('common', 'N/A') | |
until = cert.get('until', 'N/A') | |
cert_type = cert.get('type', 'N/A') | |
print(f"{cert_id:<5} {name[:30]:<30} {common[:40]:<40} {until[:20]:<20} {cert_type:<15}") | |
print(f"\nTotal certificates: {len(certs)}") | |
return True | |
def set_ui_certificate(host, api_key, cert_id, debug_mode=False): | |
"""Set the system UI certificate for HTTPS and restart the UI""" | |
# First set the certificate | |
url = f'http://{host}/api/v2.0/system/general' | |
payload = { | |
"ui_certificate": cert_id | |
} | |
logger.info(f"Setting UI certificate to ID: {cert_id}") | |
try: | |
session = requests.Session() | |
request = requests.Request( | |
'PUT', | |
url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}, | |
json=payload | |
) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_request(prepared_request)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_response(response)) | |
if response.status_code not in [200, 201]: | |
logger.error(f"Failed to set UI certificate. Status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Error response: {response.text}") | |
return False | |
# If certificate was set successfully, restart the UI | |
logger.info("Successfully set new UI certificate. Restarting UI...") | |
restart_url = f'http://{host}/api/v2.0/system/general/ui_restart' | |
restart_request = requests.Request('GET', restart_url, headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}) | |
prepared_restart_request = restart_request.prepare() | |
if debug_mode: | |
logger.debug(debug_request(prepared_restart_request)) | |
restart_response = session.send(prepared_restart_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_response(restart_response)) | |
if restart_response.status_code == 200: | |
logger.info("UI restart initiated successfully") | |
return True | |
else: | |
logger.error(f"Failed to restart UI. Status code: {restart_response.status_code}") | |
if restart_response.text: | |
logger.error(f"Error response: {restart_response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error during certificate setup or UI restart: {e}") | |
if hasattr(e, 'response') and e.response is not None: | |
logger.error(f"Status code: {e.response.status_code}") | |
if e.response.text: | |
logger.error(f"Error response: {e.response.text}") | |
return False | |
def get_cert_by_name(certs, cert_name): | |
"""Find a certificate by name with detailed logging""" | |
logger.debug(f"Looking for certificate with name: {cert_name}") | |
logger.debug("Available certificates:") | |
for cert in certs: | |
logger.debug(f" - {cert.get('id')}: {cert.get('name')}") | |
matches = [cert for cert in certs if cert['name'] == cert_name] | |
if matches: | |
logger.debug(f"Found matching certificate: {matches[0]['id']}") | |
return matches[0] | |
return None | |
def find_certificate_with_retry(host, api_key, cert_name, debug_mode=False, max_retries=5, delay=2): | |
"""Find certificate in list with retries""" | |
# Add initial delay to allow the certificate to be fully registered | |
logger.debug(f"Waiting {delay} seconds for certificate to be registered...") | |
time.sleep(delay) | |
for attempt in range(max_retries): | |
if attempt > 0: | |
logger.debug(f"Waiting {delay} seconds before retry {attempt + 1}/{max_retries}") | |
time.sleep(delay) | |
logger.debug(f"Attempting to find certificate (attempt {attempt + 1}/{max_retries})") | |
updated_certs = get_certificates(host, api_key, debug_mode) | |
if updated_certs: | |
cert_details = next((cert for cert in updated_certs if cert['name'] == cert_name), None) | |
if cert_details: | |
# Add additional delay after finding the certificate | |
logger.debug("Certificate found, waiting 2 seconds before proceeding...") | |
time.sleep(2) | |
return cert_details | |
return None | |
def check_existing_certificate(host, api_key, cert_name, debug_mode=False): | |
""" | |
Check if a certificate with the same name already exists. | |
Returns (exists, cert_id): | |
exists: bool - True if a certificate with the same name exists | |
cert_id: int or None - ID of existing certificate if found | |
""" | |
certs = get_certificates(host, api_key, debug_mode) | |
if not certs: | |
return False, None | |
existing_cert = next((cert for cert in certs if cert['name'] == cert_name), None) | |
if not existing_cert: | |
return False, None | |
return True, existing_cert['id'] | |
def import_certificate(host, api_key, cert_name, cert, priv_key, debug_mode=False, force=False): | |
"""Import certificate into TrueNAS API""" | |
url = f'http://{host}/api/v2.0/certificate' | |
# First check if certificate exists | |
exists, existing_id = check_existing_certificate( | |
host, api_key, cert_name, debug_mode | |
) | |
if exists: | |
if not force: | |
logger.error(f"Certificate '{cert_name}' already exists. Use --force to overwrite") | |
return False | |
else: | |
logger.info(f"Force flag set, overwriting existing certificate '{cert_name}'") | |
# Delete existing certificate | |
delete_url = f'http://{host}/api/v2.0/certificate/id/{existing_id}' | |
try: | |
session = requests.Session() | |
# Add delete payload | |
delete_request = requests.Request( | |
'DELETE', | |
delete_url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'} | |
) | |
prepared_request = delete_request.prepare() | |
if debug_mode: | |
logger.debug(debug_request(prepared_request)) | |
delete_response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_response(delete_response)) | |
if delete_response.status_code not in [200, 204]: | |
logger.error(f"Failed to delete existing certificate. Status code: {delete_response.status_code}") | |
if delete_response.text: | |
logger.error(f"Error response: {delete_response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error deleting existing certificate: {e}") | |
return False | |
# Continue with import | |
payload = { | |
"create_type": "CERTIFICATE_CREATE_IMPORTED", | |
"name": cert_name, | |
"certificate": cert, | |
"privatekey": priv_key, | |
} | |
logger.info(f"Importing certificate as '{cert_name}'") | |
try: | |
session = requests.Session() | |
request = requests.Request( | |
'POST', | |
url, | |
headers={'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}, | |
json=payload | |
) | |
prepared_request = request.prepare() | |
if debug_mode: | |
logger.debug(debug_request(prepared_request)) | |
response = session.send(prepared_request, verify=False) | |
if debug_mode: | |
logger.debug(debug_response(response)) | |
if response.status_code == 200: | |
logger.info(f"Certificate '{cert_name}' imported successfully") | |
# Find the certificate with retries | |
cert_details = find_certificate_with_retry(host, api_key, cert_name, debug_mode) | |
if cert_details: | |
cert_id = cert_details['id'] | |
logger.info(f"Found certificate with ID: {cert_id}") | |
logger.info("Certificate details:") | |
logger.info(f" ID: {cert_id}") | |
logger.info(f" Common Name: {cert_details.get('common', 'N/A')}") | |
logger.info(f" Valid Until: {cert_details.get('until', 'N/A')}") | |
logger.info(f" Key Type: {cert_details.get('key_type', 'N/A')}") | |
logger.info(f" Key Length: {cert_details.get('key_length', 'N/A')}") | |
# Set as UI certificate | |
if set_ui_certificate(host, api_key, cert_id, debug_mode): | |
logger.info("Certificate successfully set as system UI certificate") | |
return True | |
else: | |
logger.error("Failed to set certificate as UI certificate") | |
return False | |
else: | |
logger.error(f"Could not find imported certificate with name: {cert_name}") | |
return False | |
elif response.status_code == 401: | |
logger.error("Authentication failed. Please check your credentials.") | |
return False | |
else: | |
logger.error(f"Unexpected status code: {response.status_code}") | |
if response.text: | |
logger.error(f"Response content: {response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error importing certificate: {e}") | |
if hasattr(e, 'response') and e.response is not None: | |
logger.error(f"Status code: {e.response.status_code}") | |
if e.response.text: | |
logger.error(f"Error response: {e.response.text}") | |
return False | |
def was_cert_renewed(cert_path, domain, max_age_hours=1): | |
""" | |
Check if certificate files were recently modified, indicating a renewal | |
Args: | |
cert_path: Path to certificate directory | |
domain: Domain name for key file | |
max_age_hours: Maximum age in hours to consider as "recently renewed" | |
Returns: | |
bool: True if certificate was renewed recently, False otherwise | |
""" | |
cert_file = os.path.join(cert_path, "fullchain.cer") | |
key_file = os.path.join(cert_path, f"{domain}.key") | |
try: | |
# Get the most recent modification time of both files | |
cert_mtime = os.path.getmtime(cert_file) | |
key_mtime = os.path.getmtime(key_file) | |
most_recent = max(cert_mtime, key_mtime) | |
# Calculate age in hours | |
age_hours = (time.time() - most_recent) / 3600 | |
logger.debug(f"Certificate files age: {age_hours:.2f} hours") | |
return age_hours <= max_age_hours | |
except OSError as e: | |
logger.error(f"Error checking certificate files: {e}") | |
sys.exit(1) | |
def validate_cert_name(name): | |
""" | |
Validate certificate name to ensure it only contains allowed characters [a-zA-Z0-9_-] | |
Returns: | |
bool: True if valid, False otherwise | |
""" | |
import re | |
pattern = r'^[a-zA-Z0-9_-]+$' | |
return bool(re.match(pattern, name)) | |
def get_api_key(): | |
"""Get TrueNAS API key from environment variable or command line argument""" | |
parser = argparse.ArgumentParser(description='Import Let\'s Encrypt certificates into TrueNAS Core') | |
parser.add_argument('--api-key', help='TrueNAS API key') | |
parser.add_argument('--host', default='localhost', help='TrueNAS host address (default: localhost)') | |
parser.add_argument('--cert-path', help='Path to certificate directory') | |
parser.add_argument('--name', help='Certificate name (default: letsencrypt_YYYYMMDD)') | |
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') | |
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug output including full HTTP request/response') | |
parser.add_argument('-l', '--list', action='store_true', help='List all certificates') | |
parser.add_argument('-f', '--force', action='store_true', help='Force overwrite if certificate already exists') | |
parser.add_argument('-s', '--silent', action='store_true', help='Suppress all output, use exit code only') | |
parser.add_argument('--max-age', type=float, default=1.0, help='Maximum age in hours for certificate files to be considered recently renewed (default: 1.0)') | |
args = parser.parse_args() | |
# Validate that cert-path is provided when not listing certificates | |
if not args.list and not args.cert_path: | |
parser.error("--cert-path is required when not using --list") | |
# First check environment variable | |
api_key = os.environ.get('API_KEY') | |
if api_key: | |
logger.info("Using API key from environment variable") | |
return api_key, args | |
if args.api_key: | |
logger.info("Using API key from command line argument") | |
return args.api_key, args | |
logger.error("API key must be provided via API_KEY environment variable or --api-key argument") | |
sys.exit(1) | |
def read_certificate_files(cert_path, domain): | |
"""Read certificate and private key files""" | |
logger.info(f"Reading certificate files from {cert_path}") | |
cert_file = os.path.join(cert_path, "fullchain.cer") | |
key_file = os.path.join(cert_path, f"{domain}.key") | |
try: | |
with open(cert_file, "r") as f: | |
cert = f.read() | |
logger.debug(f"Successfully read certificate from {cert_file}") | |
with open(key_file, "r") as f: | |
priv_key = f.read() | |
logger.debug(f"Successfully read private key from {key_file}") | |
return cert, priv_key | |
except FileNotFoundError as e: | |
logger.error(f"Certificate file not found: {e}") | |
sys.exit(1) | |
except IOError as e: | |
logger.error(f"Error reading certificate files: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
# Get configuration | |
api_key, args = get_api_key() | |
# Handle silent flag first | |
if args.silent: | |
logging.getLogger().setLevel(logging.CRITICAL + 1) # Suppress all logging | |
elif args.debug: | |
logger.setLevel(logging.DEBUG) | |
elif args.verbose: | |
logger.setLevel(logging.INFO) | |
if args.list: | |
# List certificates | |
success = list_certificates(args.host, api_key, args.debug) | |
if not success: | |
sys.exit(1) | |
else: | |
# Extract domain from cert path | |
domain = os.path.basename(args.cert_path).replace('_ecc', '') | |
# Check if certificates were renewed | |
if not was_cert_renewed(args.cert_path, domain, args.max_age): | |
logger.info("Certificate files not recently modified, skipping import") | |
sys.exit(0) | |
# Use provided name or generate default name with timestamp | |
cert_name = args.name if args.name else "letsencrypt_" + datetime.now().strftime('%Y%m%d') | |
# Validate certificate name | |
if not validate_cert_name(cert_name): | |
logger.error(f"Invalid certificate name '{cert_name}'. Name can only contain letters, numbers, underscores, and hyphens [a-zA-Z0-9_-]") | |
sys.exit(1) | |
# Read certificate files | |
cert, priv_key = read_certificate_files(args.cert_path, domain) | |
# Import certificate | |
success = import_certificate( | |
args.host, | |
api_key, | |
cert_name, | |
cert, | |
priv_key, | |
args.debug, | |
args.force | |
) | |
if not success: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment