Last active
September 15, 2022 08:01
-
-
Save anthonyprintup/e6be7e0eb8defc3437c1a8f8ae2f53e3 to your computer and use it in GitHub Desktop.
Script for managing a matrix-synapse server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from enum import IntEnum, auto as _enum_auto | |
# noinspection PyArgumentList | |
class ExitCode(IntEnum): | |
# No errors occurred | |
SUCCESS: ExitCode = 0 | |
# Certificate related errors | |
CERTIFICATE_ERROR: ExitCode = _enum_auto() | |
# Synapse server related errors | |
SYNAPSE_ERROR: ExitCode = _enum_auto() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
import asyncio | |
from asyncio import AbstractEventLoop | |
from subprocess import PIPE | |
# noinspection PyProtectedMember | |
from asyncio.subprocess import Process | |
from hashlib import md5 | |
from pathlib import Path | |
from datetime import datetime | |
import yaml | |
import httpx | |
import psutil | |
from cryptography import x509 | |
from cryptography.x509 import Certificate | |
from cryptography.x509.extensions import Extension, AuthorityInformationAccess, AccessDescription | |
from cryptography.hazmat.primitives.serialization import Encoding | |
from cli.exit_codes import ExitCode | |
# Synapse paths | |
SYNAPSE_PATH: Path = Path.home() / "synapse" | |
SYNAPSE_CONFIG_PATH: Path = SYNAPSE_PATH / "homeserver.yaml" | |
SYNAPSE_ACTIVATE_ENVIRONMENT_SCRIPT_PATH: Path = SYNAPSE_PATH / "env" / "bin" / "activate" | |
# Parse the homeserver config | |
SYNAPSE_CONFIG: dict = yaml.safe_load(SYNAPSE_CONFIG_PATH.read_bytes()) | |
SYNAPSE_HOMESERVER_PID_PATH: Path = Path(SYNAPSE_CONFIG["pid_file"]) | |
SYNAPSE_CERTIFICATE_PATH: Path = Path(SYNAPSE_CONFIG["tls_certificate_path"]) | |
SYNAPSE_CERTIFICATE_KEY_PATH: Path = Path(SYNAPSE_CONFIG["tls_private_key_path"]) | |
# cPanel SSL paths | |
SSL_CERTIFICATES_PATH: Path = Path.home() / "ssl" / "certs" | |
SSL_CERTIFICATES_KEYS_PATH: Path = Path.home() / "ssl" / "keys" | |
# Local cache path | |
SSL_CERTIFICATES_CACHE_PATH: Path = Path(__file__).parent.parent.absolute() / "certificate_cache" | |
SSL_CERTIFICATES_CACHE_PATH.mkdir(exist_ok=True) | |
def find_current_certificate() -> Certificate: | |
return x509.load_pem_x509_certificate(SYNAPSE_CERTIFICATE_PATH.read_bytes()) | |
def find_latest_valid_certificate() -> Certificate: | |
now: datetime = datetime.now() | |
# Iterate over the certificates | |
valid_certificates: list[Certificate] = [] | |
for certificate_file_path in (path for path in SSL_CERTIFICATES_PATH.iterdir() | |
if not path.is_dir() and path.suffix == ".crt"): | |
# Parse the certificate | |
certificate: Certificate = x509.load_pem_x509_certificate(certificate_file_path.read_bytes()) | |
# Filter the certificates by issuer | |
if b"Let's Encrypt" not in certificate.issuer.public_bytes(): | |
continue | |
# Filter expired certificates | |
if not certificate.not_valid_before <= now <= certificate.not_valid_after: | |
continue | |
valid_certificates.append(certificate) | |
# Check if any valid certificates were found | |
if not valid_certificates: | |
raise RuntimeError("Couldn't find any valid certificates.") | |
# Return the latest certificate | |
return max(valid_certificates, key=lambda cert: cert.not_valid_after) | |
def find_certificate_key(certificate: Certificate) -> bytes: | |
# Compute the key file prefix | |
public_key_n: str = f"{certificate.public_key().public_numbers().n:x}" | |
key_file_prefix: str = f"{public_key_n[:5]}_{public_key_n[-5:]}" | |
# Iterate all the keys | |
for key_path in (path for path in SSL_CERTIFICATES_KEYS_PATH.iterdir() | |
if not path.is_dir() and path.suffix == ".key"): | |
if not key_path.name.startswith(key_file_prefix): | |
continue | |
# Return the matching key | |
return key_path.read_bytes() | |
raise RuntimeError(f"Failed to find a key file for {certificate}.") | |
def lookup_issuer_certificate(access_location: str) -> Certificate: | |
if not access_location: | |
raise RuntimeError("Invalid access location provided.") | |
access_location_hash: str = md5(access_location.encode()).hexdigest() | |
# Check if the access location exists in the cache | |
for certificate_path in SSL_CERTIFICATES_CACHE_PATH.iterdir(): | |
if certificate_path.name == access_location_hash: | |
return x509.load_pem_x509_certificate(certificate_path.read_bytes()) | |
# Download the certificate, parse it, and save it to the cache | |
response: httpx.Response = httpx.get(access_location) | |
if not response.is_success: | |
raise RuntimeError(f"Failed to download the issuer certificate: {access_location!r}.") | |
# Parse the certificate | |
issuer_certificate: Certificate = x509.load_der_x509_certificate(response.content) | |
# Save the certificate to the cache | |
cached_access_location_path: Path = SSL_CERTIFICATES_CACHE_PATH / access_location_hash | |
cached_access_location_path.write_bytes(issuer_certificate.public_bytes(encoding=Encoding.PEM)) | |
# Return the issuer certificate | |
return issuer_certificate | |
def generate_certificate_chain(certificate: Certificate, depth: int = 1) -> tuple[Certificate, ...]: | |
certificate_chain: list[Certificate] = [certificate] | |
current_certificate = certificate | |
for _ in range(depth, 0, -1): | |
# Attempt to get "AuthorityInformationAccess", will raise a TypeError if it doesn't exist | |
authority_information_access: Extension = ( | |
current_certificate.extensions.get_extension_for_class(AuthorityInformationAccess)) | |
ca_issuers_index: int = 1 | |
ca_issuers_description: AccessDescription = authority_information_access.value[ca_issuers_index] | |
access_location: str = ca_issuers_description.access_location.value | |
# Download the certificate or fetch it from cache | |
issuer_certificate: Certificate = lookup_issuer_certificate(access_location) | |
# Append it to the certificate chain | |
certificate_chain.append(issuer_certificate) | |
# Parse the next certificate | |
current_certificate = issuer_certificate | |
return tuple(certificate_chain) | |
def regenerate_certificate_data(regenerate_if_valid: bool = False) -> None: | |
# Find the current certificate | |
current_certificate: Certificate = find_current_certificate() | |
# Find the latest valid certificate | |
valid_certificate: Certificate = find_latest_valid_certificate() | |
# Skip regenerating if the certificates match | |
if not regenerate_if_valid and current_certificate == valid_certificate: | |
logging.info("Current certificate matches latest valid certificate, skipping regeneration.") | |
return | |
# Generate a certificate chain | |
valid_certificate_key: bytes = find_certificate_key(certificate=valid_certificate) | |
certificate_chain: tuple[Certificate, ...] = generate_certificate_chain(certificate=valid_certificate) | |
# Save the certificate chain to the proper location | |
logging.info(f"Writing the certificate chain to {SYNAPSE_CERTIFICATE_PATH}.") | |
SYNAPSE_CERTIFICATE_PATH.write_bytes( | |
data=b"".join(certificate.public_bytes(encoding=Encoding.PEM) for certificate in certificate_chain)) | |
# Save the certificate key to the proper location | |
logging.info(f"Writing the certificate key to {SYNAPSE_CERTIFICATE_KEY_PATH}.") | |
SYNAPSE_CERTIFICATE_KEY_PATH.write_bytes(data=valid_certificate_key) | |
def is_synapse_running() -> bool: | |
if SYNAPSE_HOMESERVER_PID_PATH.exists(): | |
synapse_pid: int = int(SYNAPSE_HOMESERVER_PID_PATH.read_text()) | |
return psutil.pid_exists(synapse_pid) | |
return False | |
async def start_synapse_server() -> None: | |
# Spawn a shell process | |
shell_process: Process = await asyncio.create_subprocess_exec( | |
program="/bin/bash", stdin=PIPE, stdout=PIPE, stderr=PIPE) | |
# Set up the shell environment | |
shell_process.stdin.write(data=f"cd {SYNAPSE_PATH}\n".encode()) | |
shell_process.stdin.write(data=f"source {SYNAPSE_ACTIVATE_ENVIRONMENT_SCRIPT_PATH}\n".encode()) | |
# Run the Synapse server | |
shell_process.stdin.write(data=b"synctl start && exit\n") | |
# Fetch the output from synctl | |
stdout, stderr = await shell_process.communicate() | |
if stdout: | |
logging.info(f"Output from synctl: {stdout.rstrip().decode()}") | |
if stderr: | |
logging.error(f"Errors from synctl: {stderr.rstrip().decode()}") | |
def main() -> ExitCode: | |
# Setup logging | |
logging.basicConfig(format="[%(asctime)s, %(levelname)s] %(message)s", | |
datefmt="%H:%M", level=logging.DEBUG, | |
stream=sys.stdout) | |
logging.getLogger("asyncio").setLevel(logging.WARNING) | |
# Regenerate certificates | |
logging.info("Attempting to regenerate certificate data.") | |
try: | |
regenerate_certificate_data() | |
except (RuntimeError, TypeError) as exception: | |
logging.error("Exception thrown in regenerate_certificate_data.") | |
logging.critical(exception, exc_info=True) | |
return ExitCode.CERTIFICATE_ERROR | |
# Manage synapse | |
if not is_synapse_running(): | |
logging.info("Attempting to start the Synapse server.") | |
try: | |
event_loop: AbstractEventLoop = asyncio.new_event_loop() | |
event_loop.run_until_complete(start_synapse_server()) | |
event_loop.close() | |
except (OSError, IOError) as exception: | |
logging.error("Exception thrown in start_synapse_server.") | |
logging.critical(exception, exc_info=True) | |
return ExitCode.SYNAPSE_ERROR | |
else: | |
logging.info("The Synapse server is already running.") | |
# Success | |
return ExitCode.SUCCESS | |
if __name__ == "__main__": | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The script is intended to be used a cron job.
This is a proof of concept for generating certificate files and managing the Synapse server under the following configuration:
Automated steps:
find_current_certificate
),find_latest_valid_certificate
),generate_certificate_chain
),find_certificate_key
),homeserver.yaml
),is_synapse_running
),start_synapse_server
).Dependencies: