Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save paigeadelethompson/ce7b4101a8f54ed0466f4cf638707dd7 to your computer and use it in GitHub Desktop.

Select an option

Save paigeadelethompson/ce7b4101a8f54ed0466f4cf638707dd7 to your computer and use it in GitHub Desktop.
./README.txt.old:Django LDAP Certificate Authority + Control Plane System
./README.txt.old:========================================================
./README.txt.old:This system is an internal infrastructure platform built using Django that combines a private Certificate Authority (PKI), LDAP-based policy enforcement, Kerberos bootstrap enrollment, asynchronous certificate issuance via Celery, and a certificate-based control-plane messaging system over UDP.
./README.txt.old:It is designed for environments where machines are fully managed and centrally controlled. Identity is not user-based or credential-based — it is entirely certificate-based.
./README.txt.old:After a machine is enrolled, it is no longer identified by Kerberos or any API key system. Instead, its identity becomes a set of X.509 certificates issued by the internal CA, with a mandatory control-plane certificate acting as the primary identity for all future communication.
./README.txt.old:LDAP defines what a machine is allowed to become. The CA enforces those rules at issuance time. Clients are responsible for generating their own private keys locally and submitting CSRs for approval.
./README.txt.old:------------------------------------------------------------
./README.txt.old:1. REPOSITORY FILE TREE
./README.txt.old:------------------------------------------------------------
./README.txt.old:ca_project/
./README.txt.old:|
./README.txt.old:|-- pyproject.toml
./README.txt.old:| - Python packaging configuration
./README.txt.old:| - defines dependencies and entry points for client/server tools
./README.txt.old:|
./README.txt.old:|-- README.md
./README.txt.old:| - system documentation and operational overview
./README.txt.old:|
./README.txt.old:|-- manage.py
./README.txt.old:| - Django management entrypoint (admin, migrations, shell, etc.)
./README.txt.old:|
./README.txt.old:|-- config.py
./README.txt.old:| - global configuration layer outside Django settings
./README.txt.old:| - used for CA behavior, notification routing, and runtime flags
./README.txt.old:|
./README.txt.old:|-- django_ca/
./README.txt.old:| |-- settings.py
./README.txt.old:| | - Django configuration (installed apps, database, auth backends)
./README.txt.old:| |
./README.txt.old:| |-- urls.py
./README.txt.old:| | - HTTP API routing
./README.txt.old:| | - includes enrollment, issuance, and control-plane endpoints
./README.txt.old:| |
./README.txt.old:| |-- wsgi.py
./README.txt.old:| |-- asgi.py
./README.txt.old:| | - deployment entrypoints for WSGI/ASGI servers
./README.txt.old:| |
./README.txt.old:| |-- celery.py
./README.txt.old:| - Celery application configuration
./README.txt.old:| - defines broker, backend, and task discovery
./README.txt.old:|
./README.txt.old:|-- ca/
./README.txt.old:| |-- apps.py
./README.txt.old:| | - Django app registration
./README.txt.old:| |
./README.txt.old:| |-- admin.py
./README.txt.old:| | - admin interface for CA management objects
./README.txt.old:| |
./README.txt.old:| |-- models.py
./README.txt.old:| | - Host:
./README.txt.old:| | represents an enrolled machine in the system
./README.txt.old:| | stores identity bindings, status, and metadata
./README.txt.old:| | |
./README.txt.old:| | - CertificateProfile:
./README.txt.old:| | defines allowed certificate types
./README.txt.old:| | maps LDAP filters → allowed SAN/CN constraints
./README.txt.old:| | |
./README.txt.old:| | - IssuedCertificate:
./README.txt.old:| | stores issued certificates and lifecycle state
./README.txt.old:| |
./README.txt.old:| |-- auth.py
./README.txt.old:| | - authentication backend for Django
./README.txt.old:| | - validates client identity using control-plane certificates
./README.txt.old:| | - replaces all API-key or password-based authentication
./README.txt.old:| |
./README.txt.old:| |-- ldap.py
./README.txt.old:| | - LDAP client layer
./README.txt.old:| | - used exclusively for querying policy and host eligibility
./README.txt.old:| |
./README.txt.old:| |-- policy.py
./README.txt.old:| | - CSR validation engine
./README.txt.old:| | - enforces LDAP-derived rules on:
./README.txt.old:| | - CN (Common Name)
./README.txt.old:| | - SAN (Subject Alternative Names)
./README.txt.old:| | - certificate profiles
./README.txt.old:| |
./README.txt.old:| |-- registry.py
./README.txt.old:| | - host registry and identity binding system
./README.txt.old:| | - maps certificate identity → network identity (IP/port)
./README.txt.old:| | - tracks enrollment state and last-known connectivity
./README.txt.old:| |
./README.txt.old:| |-- ca_engine.py
./README.txt.old:| | - core Certificate Authority logic
./README.txt.old:| | - loads CA private key and CA certificate
./README.txt.old:| | - performs X.509 signing operations
./README.txt.old:| | - enforces cryptographic constraints (EKU, KU, validity)
./README.txt.old:| |
./README.txt.old:| |-- issuance.py
./README.txt.old:| | - primary certificate issuance workflow
./README.txt.old:| | - validates CSR against policy
./README.txt.old:| | - checks LDAP authorization rules
./README.txt.old:| | - calls CA engine to sign certificates
./README.txt.old:| | - writes IssuedCertificate records
./README.txt.old:| |
./README.txt.old:| |-- tasks.py
./README.txt.old:| | - Celery async task wrapper layer
./README.txt.old:| | - provides issue_certificate_task()
./README.txt.old:| | - delegates actual logic to issuance.py
./README.txt.old:| | - enables retries and background processing
./README.txt.old:| |
./README.txt.old:| |-- renew.py
./README.txt.old:| | - certificate lifecycle management
./README.txt.old:| | - handles renewal and re-issuance before expiration
./README.txt.old:| |
./README.txt.old:| |-- message_signing.py
./README.txt.old:| | - cryptographic signing of control-plane messages
./README.txt.old:| | - ensures integrity + authenticity of server commands
./README.txt.old:| | - includes timestamp and nonce-based replay protection
./README.txt.old:| |
./README.txt.old:| |-- notification_channel.py
./README.txt.old:| | - UDP transport layer for control-plane messaging
./README.txt.old:| | - resolves host identity via registry
./README.txt.old:| | - sends signed operational messages to clients
./README.txt.old:| |
./README.txt.old:| |-- enrollment/
./README.txt.old:| | |-- views.py
./README.txt.old:| | | - POST /enroll endpoint
./README.txt.old:| | | - entrypoint for machine bootstrap
./README.txt.old:| | | - returns certificate plan (not certificates)
./README.txt.old:| | |
./README.txt.old:| | |-- kerberos.py
./README.txt.old:| | | - Kerberos authentication validation layer
./README.txt.old:| | | - used only during initial enrollment bootstrap
./README.txt.old:| | |
./README.txt.old:| | |-- profiles.py
./README.txt.old:| | - builds certificate plans from LDAP policy
./README.txt.old:| | - determines required certificates per host
./README.txt.old:| | - injects mandatory control-plane certificate requirement
./README.txt.old:|
./README.txt.old:|-- client/
./README.txt.old:| |-- csr.py
./README.txt.old:| | - local key generation (always client-side)
./README.txt.old:| | - CSR construction based on server-issued plan
./README.txt.old:| | - ensures private keys never leave machine
./README.txt.old:| |
./README.txt.old:| |-- agent.py
./README.txt.old:| | - long-running UDP listener daemon
./README.txt.old:| | - receives control-plane messages from server
./README.txt.old:| | - verifies signature, timestamp, nonce (replay protection)
./README.txt.old:| | - dispatches system actions safely
./README.txt.old:| |
./README.txt.old:| |-- install.py
./README.txt.old:| - installs certificates into /etc/ssl directories
./README.txt.old:| - configures system trust and identity usage
./README.txt.old:|
./README.txt.old:|-- client_notification_agent.py
./README.txt.old:| - legacy client runtime entrypoint
./README.txt.old:| - overlaps with client/agent.py
./README.txt.old:| - should be migrated into client/ directory for consistency
./README.txt.old:------------------------------------------------------------
./README.txt.old:2. ARCHITECTURE OVERVIEW
./README.txt.old:------------------------------------------------------------
./README.txt.old:The system is composed of four tightly connected layers:
./README.txt.old:Identity Bootstrap Layer
./README.txt.old:- Responsible for initial machine onboarding
./README.txt.old:- Uses Kerberos authentication as the first trust mechanism
./README.txt.old:- Ensures only authorized machines can request enrollment
./README.txt.old:Certificate Authority Layer
./README.txt.old:- Responsible for issuing all X.509 certificates
./README.txt.old:- Enforces LDAP-based policy rules during issuance
./README.txt.old:- Validates and signs CSRs using CA private key
./README.txt.old:- Maintains full certificate lifecycle tracking
./README.txt.old:Identity and Registry Layer
./README.txt.old:- Maintains persistent mapping of machines
./README.txt.old:- Links certificate identity to network identity (IP/port)
./README.txt.old:- Tracks enrollment state and operational status
./README.txt.old:Control-Plane Layer
./README.txt.old:- Provides secure server-to-client messaging system
./README.txt.old:- Uses UDP as transport (no persistent connection required)
./README.txt.old:- Ensures authenticity via cryptographic signing
./README.txt.old:------------------------------------------------------------
./README.txt.old:3. TRUST MODEL
./README.txt.old:------------------------------------------------------------
./README.txt.old:LDAP (Policy Authority)
./README.txt.old:- Defines what machines are allowed to exist
./README.txt.old:- Defines which certificate profiles can be issued
./README.txt.old:- Controls allowed CN/SAN structures
./README.txt.old:- Does not store certificates or runtime state
./README.txt.old:Kerberos (Bootstrap Only)
./README.txt.old:- Used only for initial enrollment authentication
./README.txt.old:- Provides first proof of machine identity
./README.txt.old:- Completely replaced after certificate issuance begins
./README.txt.old:X.509 Certificates (Primary Identity System)
./README.txt.old:- Every machine is identified through certificates
./README.txt.old:- Control-plane certificate becomes the primary identity credential
./README.txt.old:- Used for authentication, authorization, and messaging
./README.txt.old:CA Private Key (Root Trust Anchor)
./README.txt.old:- Used only within CA engine
./README.txt.old:- Signs all certificates
./README.txt.old:- Never leaves the CA system boundary
./README.txt.old:Client Private Keys
./README.txt.old:- Generated locally on each machine
./README.txt.old:- Never transmitted or exposed to server
./README.txt.old:- Stored under secure system directories
./README.txt.old:------------------------------------------------------------
./README.txt.old:4. ENROLLMENT FLOW
./README.txt.old:------------------------------------------------------------
./README.txt.old:Step 1: Kerberos Bootstrap Authentication
./README.txt.old:- Machine authenticates using Kerberos credentials
./README.txt.old:- Establishes initial trust relationship with CA system
./README.txt.old:Step 2: Enrollment Request
./README.txt.old:- Client calls POST /enroll endpoint
./README.txt.old:- Request includes Kerberos identity context
./README.txt.old:Step 3: Server Identity Validation
./README.txt.old:- Server validates Kerberos identity via kerberos.py
./README.txt.old:- Extracts machine identity and hostname
./README.txt.old:Step 4: Host Registration
./README.txt.old:- Host is created in registry
./README.txt.old:- IP address and metadata recorded
./README.txt.old:- Machine marked as “known but not provisioned”
./README.txt.old:Step 5: LDAP Policy Evaluation
./README.txt.old:- LDAP queried for machine eligibility
./README.txt.old:- Certificate profile selection is determined
./README.txt.old:Step 6: Certificate Plan Generation
./README.txt.old:- profiles.py generates full certificate plan
./README.txt.old:- Includes:
./README.txt.old: - required control-plane certificate (mandatory)
./README.txt.old: - optional service certificates (policy-based)
./README.txt.old: - SAN/CN constraints
./README.txt.old: - validity rules
./README.txt.old:Step 7: Response to Client
./README.txt.old:- Client receives certificate plan only
./README.txt.old:- No certificates are issued at this stage
./README.txt.old:------------------------------------------------------------
./README.txt.old:5. CERTIFICATE ISSUANCE FLOW
./README.txt.old:------------------------------------------------------------
./README.txt.old:Step 1: Client Key Generation
./README.txt.old:- Private key is generated locally on client machine
./README.txt.old:- Key never leaves system boundary
./README.txt.old:Step 2: CSR Generation
./README.txt.old:- CSR is built using server-provided certificate plan
./README.txt.old:- Includes CN, SAN, and profile metadata
./README.txt.old:Step 3: CSR Submission
./README.txt.old:- CSR submitted to issuance endpoint
./README.txt.old:- Authentication occurs via certificate identity after bootstrap
./README.txt.old:Step 4: Celery Task Dispatch
./README.txt.old:- tasks.py queues issuance asynchronously
./README.txt.old:- allows retry and non-blocking processing
./README.txt.old:Step 5: Issuance Validation
./README.txt.old:- issuance.py validates CSR
./README.txt.old:- applies LDAP policy constraints
./README.txt.old:- ensures CSR matches certificate plan
./README.txt.old:Step 6: Certificate Signing
./README.txt.old:- ca_engine.py signs CSR using CA private key
./README.txt.old:- produces valid X.509 certificate
./README.txt.old:Step 7: Persistence
./README.txt.old:- IssuedCertificate stored in database
./README.txt.old:Step 8: Client Installation
./README.txt.old:- client/install.py installs certificate into system trust stores
./README.txt.old:------------------------------------------------------------
./README.txt.old:6. CELERY ISSUANCE LAYER
./README.txt.old:------------------------------------------------------------
./README.txt.old:Celery is used to decouple certificate issuance from HTTP request handling.
./README.txt.old:ca/tasks.py:
./README.txt.old:- provides issue_certificate_task()
./README.txt.old:- wraps issuance.issue_certificate()
./README.txt.old:- enables retries and background execution
./README.txt.old:- prevents CA bottlenecks under load
./README.txt.old:------------------------------------------------------------
./README.txt.old:7. CONTROL-PLANE SYSTEM
./README.txt.old:------------------------------------------------------------
./README.txt.old:The control-plane enables server-to-client operational messaging.
./README.txt.old:Server Side:
./README.txt.old:- registry resolves host → network address
./README.txt.old:- notification_channel.py sends UDP packets
./README.txt.old:- message_signing.py ensures message integrity and authenticity
./README.txt.old:Client Side:
./README.txt.old:- agent.py listens on UDP socket
./README.txt.old:- verifies signature, nonce, timestamp
./README.txt.old:- rejects replayed or invalid messages
./README.txt.old:- executes authorized system actions
./README.txt.old:No persistent connection is required. Communication is event-based.
./README.txt.old:------------------------------------------------------------
./README.txt.old:8. MESSAGE FORMAT
./README.txt.old:------------------------------------------------------------
./README.txt.old:Control-plane messages are structured as:
./README.txt.old:{
./README.txt.old: "event": "reboot",
./README.txt.old: "payload": {
./README.txt.old: "reason": "kernel update required"
./README.txt.old: },
./README.txt.old: "timestamp": 1710000000,
./README.txt.old: "nonce": "uuid",
./README.txt.old: "ttl": 30,
./README.txt.old: "signature": "hex-encoded-signature"
./README.txt.old:}
./README.txt.old:------------------------------------------------------------
./README.txt.old:9. LDAP POLICY SYSTEM
./README.txt.old:------------------------------------------------------------
./README.txt.old:LDAP acts as the authoritative policy layer.
./README.txt.old:It defines:
./README.txt.old:- which machines are allowed to enroll
./README.txt.old:- which certificate profiles are valid
./README.txt.old:- which SAN and CN values are permitted
./README.txt.old:LDAP is not used for:
./README.txt.old:- certificate storage
./README.txt.old:- runtime communication
./README.txt.old:- identity authentication after enrollment
./README.txt.old:------------------------------------------------------------
./README.txt.old:10. HOST REGISTRY
./README.txt.old:------------------------------------------------------------
./README.txt.old:The registry tracks machine identity and network state.
./README.txt.old:It stores:
./README.txt.old:- hostname
./README.txt.old:- certificate identity binding
./README.txt.old:- IP address
./README.txt.old:- UDP port
./README.txt.old:- enrollment status
./README.txt.old:It is used for:
./README.txt.old:- routing control-plane messages
./README.txt.old:- resolving machine identity at runtime
./README.txt.old:------------------------------------------------------------
./README.txt.old:11. SECURITY MODEL
./README.txt.old:------------------------------------------------------------
./README.txt.old:- private keys never leave client machines
./README.txt.old:- CA signs only validated CSRs
./README.txt.old:- LDAP defines issuance policy
./README.txt.old:- certificates are the only identity system after enrollment
./README.txt.old:- all control-plane messages are signed
./README.txt.old:- replay protection enforced via nonce + timestamp validation
./README.txt.old:- no API keys exist anywhere in system
./README.txt.old:------------------------------------------------------------
./README.txt.old:12. LIMITATIONS
./README.txt.old:------------------------------------------------------------
./README.txt.old:- UDP delivery is best-effort (no guaranteed delivery)
./README.txt.old:- DTLS not fully implemented (UDP-only transport)
./README.txt.old:- no HA registry or clustering layer yet
./README.txt.old:- no OCSP/CRL revocation system yet
./README.txt.old:- ACME implementation is simplified relative to RFC 8555
./README.txt.old:------------------------------------------------------------
./README.txt.old:13. SYSTEM SUMMARY
./README.txt.old:------------------------------------------------------------
./README.txt.old:This system functions as:
./README.txt.old:- a private internal PKI (Certificate Authority)
./README.txt.old:- an LDAP-driven policy enforcement engine
./README.txt.old:- a Kerberos bootstrap enrollment system
./README.txt.old:- a Celery-based asynchronous issuance pipeline
./README.txt.old:- a certificate lifecycle management system
./README.txt.old:- a certificate-only identity system for machines
./README.txt.old:- a secure UDP control-plane orchestration bus
[sq@msi ~/ca]$ ^C
[sq@msi ~/ca]$ grep -r "." | grep -v README
./pyproject.toml:[build-system]
./pyproject.toml:requires = ["setuptools>=68", "wheel"]
./pyproject.toml:build-backend = "setuptools.build_meta"
./pyproject.toml:[project]
./pyproject.toml:name = "django-ca"
./pyproject.toml:version = "0.1.0"
./pyproject.toml:description = "Django LDAP Certificate Authority with certificate-based control-plane system"
./pyproject.toml:requires-python = ">=3.11"
./pyproject.toml:dependencies = [
./pyproject.toml: "Django>=4.2",
./pyproject.toml: "celery>=5.3",
./pyproject.toml: "pyzmq>=25.0",
./pyproject.toml: "python-ldap>=3.4",
./pyproject.toml: "django-ldapdb>=2.0",
./pyproject.toml: "cryptography>=42.0",
./pyproject.toml: "pytz>=2024.1",
./pyproject.toml:]
./pyproject.toml:[project.optional-dependencies]
./pyproject.toml:dev = [
./pyproject.toml: "ipython",
./pyproject.toml: "black",
./pyproject.toml: "ruff",
./pyproject.toml:]
./pyproject.toml:[project.scripts]
./pyproject.toml:ca-server = "django_ca.cli:main"
./pyproject.toml:ca-enroll = "client.enroll:main"
./pyproject.toml:ca-renew = "ca.renew:main"
./pyproject.toml:ca-agent = "client.agent:main"
./pyproject.toml:[tool.setuptools]
./pyproject.toml:packages = [
./pyproject.toml: "django_ca",
./pyproject.toml: "ca",
./pyproject.toml: "ca.enrollment",
./pyproject.toml: "client",
./pyproject.toml:]
./pyproject.toml:[tool.setuptools.package-data]
./pyproject.toml:"django_ca" = ["*.py"]
./pyproject.toml:"ca" = ["*.py"]
./pyproject.toml:"ca.enrollment" = ["*.py"]
./pyproject.toml:"client" = ["*.py"]
./manage.py:# manage.py
./manage.py:#!/usr/bin/env python3
./manage.py:import os
./manage.py:import sys
./manage.py:def main():
./manage.py: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings")
./manage.py: from django.core.management import execute_from_command_line
./manage.py: execute_from_command_line(sys.argv)
./manage.py:if __name__ == "__main__":
./manage.py: main()
./django_ca/__init__.py:# django_ca/__init__.py
./django_ca/__init__.py:from .celery import app as celery_app
./django_ca/__init__.py:__all__ = ("celery_app",)
./django_ca/celery.py:# django_ca/celery.py
./django_ca/celery.py:"""
./django_ca/celery.py:Celery application configuration for CA system.
./django_ca/celery.py:This worker is responsible for:
./django_ca/celery.py:- asynchronous certificate issuance
./django_ca/celery.py:- background renewal workflows (future)
./django_ca/celery.py:- policy evaluation tasks (future)
./django_ca/celery.py:Transport:
./django_ca/celery.py:- ZeroMQ broker by default (as requested)
./django_ca/celery.py:"""
./django_ca/celery.py:import os
./django_ca/celery.py:from celery import Celery
./django_ca/celery.py:os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings")
./django_ca/celery.py:app = Celery("django_ca")
./django_ca/celery.py:# Default configuration
./django_ca/celery.py:app.conf.update(
./django_ca/celery.py: broker_url=os.environ.get("CELERY_BROKER_URL", "pyamqp://guest@localhost//"),
./django_ca/celery.py: result_backend=os.environ.get("CELERY_RESULT_BACKEND", "rpc://"),
./django_ca/celery.py: task_serializer="json",
./django_ca/celery.py: accept_content=["json"],
./django_ca/celery.py: result_serializer="json",
./django_ca/celery.py: timezone="UTC",
./django_ca/celery.py: enable_utc=True,
./django_ca/celery.py:)
./django_ca/celery.py:# Optional ZeroMQ override (explicit opt-in)
./django_ca/celery.py:if os.environ.get("CELERY_BROKER", "").lower() == "zeromq":
./django_ca/celery.py: app.conf.update(
./django_ca/celery.py: broker_url="zmq://",
./django_ca/celery.py: )
./django_ca/celery.py:app.autodiscover_tasks()
./django_ca/settings.py:from pathlib import Path
./django_ca/settings.py:import os
./django_ca/settings.py:BASE_DIR = Path(__file__).resolve().parent.parent
./django_ca/settings.py:SECRET_KEY = "CHANGE_ME"
./django_ca/settings.py:DEBUG = True
./django_ca/settings.py:ALLOWED_HOSTS = ["*"]
./django_ca/settings.py:INSTALLED_APPS = [
./django_ca/settings.py: "django.contrib.admin",
./django_ca/settings.py: "django.contrib.auth",
./django_ca/settings.py: "django.contrib.contenttypes",
./django_ca/settings.py: "django.contrib.sessions",
./django_ca/settings.py: "django.contrib.messages",
./django_ca/settings.py: "django.contrib.staticfiles",
./django_ca/settings.py: "rest_framework",
./django_ca/settings.py: "django_ldapdb",
./django_ca/settings.py: "ca",
./django_ca/settings.py:]
./django_ca/settings.py:MIDDLEWARE = [
./django_ca/settings.py: "django.middleware.security.SecurityMiddleware",
./django_ca/settings.py: "whitenoise.middleware.WhiteNoiseMiddleware",
./django_ca/settings.py: "django.contrib.sessions.middleware.SessionMiddleware",
./django_ca/settings.py: "django.middleware.common.CommonMiddleware",
./django_ca/settings.py: "django.middleware.csrf.CsrfViewMiddleware",
./django_ca/settings.py: "django.contrib.auth.middleware.AuthenticationMiddleware",
./django_ca/settings.py: "django.contrib.messages.middleware.MessageMiddleware",
./django_ca/settings.py:]
./django_ca/settings.py:ROOT_URLCONF = "django_ca.urls"
./django_ca/settings.py:TEMPLATES = [
./django_ca/settings.py: {
./django_ca/settings.py: "BACKEND": "django.template.backends.django.DjangoTemplates",
./django_ca/settings.py: "DIRS": [],
./django_ca/settings.py: "APP_DIRS": True,
./django_ca/settings.py: "OPTIONS": {
./django_ca/settings.py: "context_processors": [
./django_ca/settings.py: "django.template.context_processors.request",
./django_ca/settings.py: "django.contrib.auth.context_processors.auth",
./django_ca/settings.py: "django.contrib.messages.context_processors.messages",
./django_ca/settings.py: ],
./django_ca/settings.py: },
./django_ca/settings.py: },
./django_ca/settings.py:]
./django_ca/settings.py:WSGI_APPLICATION = "django_ca.wsgi.application"
./django_ca/settings.py:ASGI_APPLICATION = "django_ca.asgi.application"
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:# DATABASE
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:DATABASES = {
./django_ca/settings.py: "default": {
./django_ca/settings.py: "ENGINE": "django.db.backends.sqlite3",
./django_ca/settings.py: "NAME": BASE_DIR / "db.sqlite3",
./django_ca/settings.py: }
./django_ca/settings.py:}
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:# INTERNATIONALIZATION
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:LANGUAGE_CODE = "en-us"
./django_ca/settings.py:TIME_ZONE = "UTC"
./django_ca/settings.py:USE_I18N = True
./django_ca/settings.py:USE_TZ = True
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:# STATIC FILES
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:STATIC_URL = "/static/"
./django_ca/settings.py:STATIC_ROOT = BASE_DIR / "static"
./django_ca/settings.py:DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
./django_ca/settings.py:# =========================================================
./django_ca/settings.py:# LDAP CONFIGURATION (NOW SINGLE SOURCE OF TRUTH)
./django_ca/settings.py:# =========================================================
./django_ca/settings.py:LDAP_SERVER_URI = os.environ.get("LDAP_URI", "ldap://ldap.example.com")
./django_ca/settings.py:LDAP_BIND_DN = os.environ.get("LDAP_BIND_DN", "cn=service,dc=example,dc=com")
./django_ca/settings.py:LDAP_BIND_PASSWORD = os.environ.get("LDAP_BIND_PASSWORD", "CHANGE_ME")
./django_ca/settings.py:LDAP_BASE_DN = os.environ.get("LDAP_BASE_DN", "dc=example,dc=com")
./django_ca/settings.py:# LDAP STRUCTURE (previously ldapdb_config.py)
./django_ca/settings.py:LDAP_HOSTS_OU = os.environ.get("LDAP_HOSTS_OU", "ou=hosts")
./django_ca/settings.py:LDAP_PROFILES_OU = os.environ.get("LDAP_PROFILES_OU", "ou=cert-profiles")
./django_ca/settings.py:LDAP_CERTS_OU = os.environ.get("LDAP_CERTS_OU", "ou=issued-certs")
./django_ca/settings.py:LDAP_ENROLLMENT_OU = os.environ.get("LDAP_ENROLLMENT_OU", "ou=enrollment-events")
./django_ca/settings.py:LDAP_USE_TLS = os.environ.get("LDAP_USE_TLS", "false").lower() == "true"
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:# CA CONFIG
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:CA_CERTIFICATE_PATH = "/etc/myca/ca.crt"
./django_ca/settings.py:CA_PRIVATE_KEY_PATH = "/etc/myca/intermediate.key"
./django_ca/settings.py:CA_PRIVATE_KEY_PASSWORD = None
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:# CELERY
./django_ca/settings.py:# -----------------------------
./django_ca/settings.py:CELERY_BROKER_URL = os.environ.get(
./django_ca/settings.py: "CELERY_BROKER_URL",
./django_ca/settings.py: "redis://127.0.0.1:6379/0",
./django_ca/settings.py:)
./django_ca/settings.py:CELERY_RESULT_BACKEND = os.environ.get(
./django_ca/settings.py: "CELERY_RESULT_BACKEND",
./django_ca/settings.py: "redis://127.0.0.1:6379/1",
./django_ca/settings.py:)
./django_ca/settings.py:CELERY_ACCEPT_CONTENT = ["json"]
./django_ca/settings.py:CELERY_TASK_SERIALIZER = "json"
./django_ca/settings.py:CELERY_RESULT_SERIALIZER = "json"
./django_ca/settings.py:CELERY_TIMEZONE = "UTC"
./django_ca/urls.py:# django_ca/urls.py
./django_ca/urls.py:"""
./django_ca/urls.py:Root URL routing for CA system.
./django_ca/urls.py:This file defines the public control-plane API surface:
./django_ca/urls.py:- Enrollment (Kerberos-gated bootstrap)
./django_ca/urls.py:- Certificate issuance
./django_ca/urls.py:- ACME-like compatibility endpoints
./django_ca/urls.py:No business logic should exist here.
./django_ca/urls.py:"""
./django_ca/urls.py:from django.contrib import admin
./django_ca/urls.py:from django.urls import path
./django_ca/urls.py:from ca.enrollment.views import enroll, issue
./django_ca/urls.py:from ca.acme.views import new_order, finalize
./django_ca/urls.py:urlpatterns = [
./django_ca/urls.py: path("admin/", admin.site.urls),
./django_ca/urls.py: # -----------------------------
./django_ca/urls.py: # Enrollment API (Kerberos gate + CSR lifecycle)
./django_ca/urls.py: # -----------------------------
./django_ca/urls.py: path("enroll/", enroll),
./django_ca/urls.py: path("issue/", issue),
./django_ca/urls.py: # -----------------------------
./django_ca/urls.py: # ACME-compatible surface
./django_ca/urls.py: # -----------------------------
./django_ca/urls.py: path("acme/new-order/", new_order),
./django_ca/urls.py: path("acme/finalize/", finalize),
./django_ca/urls.py:]
./django_ca/wsgi.py:# django_ca/wsgi.py
./django_ca/wsgi.py:import os
./django_ca/wsgi.py:from django.core.wsgi import get_wsgi_application
./django_ca/wsgi.py:os.environ.setdefault(
./django_ca/wsgi.py: "DJANGO_SETTINGS_MODULE",
./django_ca/wsgi.py: "django_ca.settings",
./django_ca/wsgi.py:)
./django_ca/wsgi.py:application = get_wsgi_application()
./django_ca/asgi.py:# django_ca/asgi.py
./django_ca/asgi.py:import os
./django_ca/asgi.py:from django.core.asgi import get_asgi_application
./django_ca/asgi.py:os.environ.setdefault(
./django_ca/asgi.py: "DJANGO_SETTINGS_MODULE",
./django_ca/asgi.py: "django_ca.settings",
./django_ca/asgi.py:)
./django_ca/asgi.py:application = get_asgi_application()
./django_ca/cli.py:# django_ca/cli.py
./django_ca/cli.py:import os
./django_ca/cli.py:import sys
./django_ca/cli.py:import argparse
./django_ca/cli.py:import django
./django_ca/cli.py:from pathlib import Path
./django_ca/cli.py:def setup_django():
./django_ca/cli.py: """
./django_ca/cli.py: Bootstraps Django environment for CLI execution.
./django_ca/cli.py: """
./django_ca/cli.py: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings")
./django_ca/cli.py: django.setup()
./django_ca/cli.py:def run_server(host="0.0.0.0", port=8000):
./django_ca/cli.py: """
./django_ca/cli.py: Runs Django development server (CA API + enrollment + ACME endpoints).
./django_ca/cli.py: """
./django_ca/cli.py: from django.core.management import execute_from_command_line
./django_ca/cli.py: sys.argv = ["manage.py", "runserver", f"{host}:{port}"]
./django_ca/cli.py: execute_from_command_line(sys.argv)
./django_ca/cli.py:def run_migrations():
./django_ca/cli.py: """
./django_ca/cli.py: Applies database migrations for CA models.
./django_ca/cli.py: """
./django_ca/cli.py: from django.core.management import execute_from_command_line
./django_ca/cli.py: sys.argv = ["manage.py", "migrate"]
./django_ca/cli.py: execute_from_command_line(sys.argv)
./django_ca/cli.py:def create_superuser():
./django_ca/cli.py: """
./django_ca/cli.py: Creates Django admin superuser (interactive).
./django_ca/cli.py: """
./django_ca/cli.py: from django.core.management import execute_from_command_line
./django_ca/cli.py: sys.argv = ["manage.py", "createsuperuser"]
./django_ca/cli.py: execute_from_command_line(sys.argv)
./django_ca/cli.py:def main():
./django_ca/cli.py: setup_django()
./django_ca/cli.py: parser = argparse.ArgumentParser(description="Django CA Control Plane CLI")
./django_ca/cli.py: sub = parser.add_subparsers(dest="command")
./django_ca/cli.py: server = sub.add_parser("server", help="Run CA server")
./django_ca/cli.py: server.add_argument("--host", default="0.0.0.0")
./django_ca/cli.py: server.add_argument("--port", default=8000, type=int)
./django_ca/cli.py: sub.add_parser("migrate", help="Run migrations")
./django_ca/cli.py: sub.add_parser("createsuperuser", help="Create admin user")
./django_ca/cli.py: args = parser.parse_args()
./django_ca/cli.py: if args.command == "server":
./django_ca/cli.py: run_server(args.host, args.port)
./django_ca/cli.py: elif args.command == "migrate":
./django_ca/cli.py: run_migrations()
./django_ca/cli.py: elif args.command == "createsuperuser":
./django_ca/cli.py: create_superuser()
./django_ca/cli.py: else:
./django_ca/cli.py: parser.print_help()
./django_ca/cli.py:if __name__ == "__main__":
./django_ca/cli.py: main()
./ca/apps.py:# ca/apps.py
./ca/apps.py:from django.apps import AppConfig
./ca/apps.py:class CAConfig(AppConfig):
./ca/apps.py: default_auto_field = "django.db.models.BigAutoField"
./ca/apps.py: name = "ca"
./ca/models.py:# ca/models.py
./ca/models.py:from django_ldapdb.models import Model
./ca/models.py:from django_ldapdb.models.fields import CharField, IntegerField, BooleanField, TextField
./ca/models.py:class Host(Model):
./ca/models.py: """
./ca/models.py: Logical host identity.
./ca/models.py: LDAP is the backing store, but structure is NOT defined here.
./ca/models.py: This model represents identity semantics only.
./ca/models.py: """
./ca/models.py: cn = CharField(max_length=255, primary_key=True)
./ca/models.py: ip_address = CharField(max_length=255, null=True, blank=True)
./ca/models.py: is_enrolled = BooleanField(default=False)
./ca/models.py: def __str__(self):
./ca/models.py: return self.cn
./ca/models.py:class CertificateProfile(Model):
./ca/models.py: """
./ca/models.py: Certificate issuance policy object.
./ca/models.py: Defines rules for:
./ca/models.py: - SAN/CN constraints
./ca/models.py: - validity
./ca/models.py: - control-plane requirements
./ca/models.py: """
./ca/models.py: name = CharField(max_length=255, primary_key=True)
./ca/models.py: ldap_filter = TextField()
./ca/models.py: allowed_san_regex = TextField(null=True, blank=True)
./ca/models.py: allowed_cn_regex = TextField(null=True, blank=True)
./ca/models.py: requires_control_plane_cert = BooleanField(default=True)
./ca/models.py: validity_days = IntegerField(default=365)
./ca/models.py: def __str__(self):
./ca/models.py: return self.name
./ca/models.py:class IssuedCertificate(Model):
./ca/models.py: """
./ca/models.py: Certificate lifecycle record.
./ca/models.py: This is the CA’s authoritative issuance ledger.
./ca/models.py: """
./ca/models.py: serial_number = CharField(max_length=255, primary_key=True)
./ca/models.py: hostname = CharField(max_length=255)
./ca/models.py: profile_name = CharField(max_length=255)
./ca/models.py: certificate_pem = TextField()
./ca/models.py: ca_bundle_pem = TextField()
./ca/models.py: issued_at = CharField(max_length=64)
./ca/models.py: not_after = CharField(max_length=64, null=True, blank=True)
./ca/models.py: last_csr = TextField(null=True, blank=True)
./ca/models.py: revoked = BooleanField(default=False)
./ca/models.py: def __str__(self):
./ca/models.py: return f"{self.hostname}:{self.profile_name}"
./ca/admin.py:# ca/admin.py
./ca/admin.py:from django.contrib import admin
./ca/admin.py:from .models import (
./ca/admin.py: CertificateProfile,
./ca/admin.py: EnrolledHost,
./ca/admin.py: IssuedCertificate,
./ca/admin.py:)
./ca/admin.py:admin.site.register(CertificateProfile)
./ca/admin.py:admin.site.register(EnrolledHost)
./ca/admin.py:admin.site.register(IssuedCertificate)
./ca/auth.py:# ca/auth.py
./ca/auth.py:"""
./ca/auth.py:Certificate-based authentication backend.
./ca/auth.py:Runtime identity is derived from client certificates
./ca/auth.py:provided by TLS terminator / reverse proxy.
./ca/auth.py:"""
./ca/auth.py:from django.contrib.auth.backends import BaseBackend
./ca/auth.py:from django.contrib.auth.models import User
./ca/auth.py:class CertificateAuthBackend(BaseBackend):
./ca/auth.py: """
./ca/auth.py: Authenticates requests using client certificate headers.
./ca/auth.py: Expected headers (from proxy):
./ca/auth.py: X-Client-CN
./ca/auth.py: X-Client-Serial
./ca/auth.py: """
./ca/auth.py: def authenticate(self, request, username=None, password=None, **kwargs):
./ca/auth.py: if not request:
./ca/auth.py: return None
./ca/auth.py: cn = request.META.get("HTTP_X_CLIENT_CN")
./ca/auth.py: serial = request.META.get("HTTP_X_CLIENT_SERIAL")
./ca/auth.py: if not cn or not serial:
./ca/auth.py: return None
./ca/auth.py: # Identity is certificate-based.
./ca/auth.py: # Django user is OPTIONAL mapping layer, not authority.
./ca/auth.py: try:
./ca/auth.py: return User.objects.filter(username=cn).first()
./ca/auth.py: except Exception:
./ca/auth.py: return None
./ca/auth.py: def get_user(self, user_id):
./ca/auth.py: try:
./ca/auth.py: return User.objects.get(pk=user_id)
./ca/auth.py: except User.DoesNotExist:
./ca/auth.py: return None
./ca/tasks.py:# ca/tasks.py
./ca/tasks.py:"""
./ca/tasks.py:Celery task layer for asynchronous certificate issuance.
./ca/tasks.py:This is the boundary between:
./ca/tasks.py:- HTTP/API requests
./ca/tasks.py:- CA issuance engine
./ca/tasks.py:- background worker execution
./ca/tasks.py:Execution model:
./ca/tasks.py:- API triggers task
./ca/tasks.py:- Celery worker executes issuance
./ca/tasks.py:- result is returned or stored in LDAP-backed IssuedCertificate
./ca/tasks.py:"""
./ca/tasks.py:from celery import shared_task
./ca/tasks.py:from ca.issuance import issue_certificate
./ca/tasks.py:@shared_task
./ca/tasks.py:def issue_certificate_task(hostname: str, csr_pem: str, profile_name: str):
./ca/tasks.py: """
./ca/tasks.py: Background certificate issuance task.
./ca/tasks.py: """
./ca/tasks.py: return issue_certificate(
./ca/tasks.py: hostname=hostname,
./ca/tasks.py: csr_pem=csr_pem,
./ca/tasks.py: profile_name=profile_name,
./ca/tasks.py: )
./ca/issuance.py:# ca/issuance.py
./ca/issuance.py:"""
./ca/issuance.py:Issuance layer (service boundary).
./ca/issuance.py:This module sits between:
./ca/issuance.py:- API / enrollment layer
./ca/issuance.py:- CA engine (cryptographic core)
./ca/issuance.py:- LDAP-backed persistence models
./ca/issuance.py:It enforces policy flow and keeps CAEngine isolated from request logic.
./ca/issuance.py:"""
./ca/issuance.py:from typing import Dict, Any
./ca/issuance.py:from ca.ca_engine import CAEngine
./ca/issuance.py:from ca.models import CertificateProfile
./ca/issuance.py:class IssuanceService:
./ca/issuance.py: """
./ca/issuance.py: High-level certificate issuance orchestration layer.
./ca/issuance.py: """
./ca/issuance.py: def __init__(self, engine: CAEngine):
./ca/issuance.py: self.engine = engine
./ca/issuance.py: def get_profile(self, profile_name: str) -> CertificateProfile:
./ca/issuance.py: return CertificateProfile.objects.get(name=profile_name)
./ca/issuance.py: def issue(
./ca/issuance.py: self,
./ca/issuance.py: hostname: str,
./ca/issuance.py: csr_pem: str,
./ca/issuance.py: profile_name: str,
./ca/issuance.py: ) -> Dict[str, Any]:
./ca/issuance.py: """
./ca/issuance.py: Main entrypoint for certificate issuance.
./ca/issuance.py: Flow:
./ca/issuance.py: 1. Validate profile exists
./ca/issuance.py: 2. Delegate CSR validation + signing to CA engine
./ca/issuance.py: 3. Return signed certificate bundle
./ca/issuance.py: """
./ca/issuance.py: profile = self.get_profile(profile_name)
./ca/issuance.py: # Minimal sanity gate before engine
./ca/issuance.py: if not profile:
./ca/issuance.py: raise ValueError(f"Unknown profile: {profile_name}")
./ca/issuance.py: return self.engine.issue_certificate(
./ca/issuance.py: hostname=hostname,
./ca/issuance.py: csr_pem=csr_pem,
./ca/issuance.py: profile_name=profile.name,
./ca/issuance.py: )
./ca/issuance.py:def issue_certificate(hostname: str, csr_pem: str, profile_name: str) -> Dict[str, Any]:
./ca/issuance.py: """
./ca/issuance.py: Functional wrapper used by Celery/tasks and API endpoints.
./ca/issuance.py: """
./ca/issuance.py: # NOTE: In real deployment, keys would be loaded from secure storage (HSM/filesystem vault)
./ca/issuance.py: with open("/etc/ca/ca.key", "rb") as f:
./ca/issuance.py: ca_key = f.read()
./ca/issuance.py: with open("/etc/ca/ca.crt", "rb") as f:
./ca/issuance.py: ca_cert = f.read()
./ca/issuance.py: engine = CAEngine(
./ca/issuance.py: ca_private_key_pem=ca_key,
./ca/issuance.py: ca_cert_pem=ca_cert,
./ca/issuance.py: )
./ca/issuance.py: service = IssuanceService(engine)
./ca/issuance.py: return service.issue(
./ca/issuance.py: hostname=hostname,
./ca/issuance.py: csr_pem=csr_pem,
./ca/issuance.py: profile_name=profile_name,
./ca/issuance.py: )
./ca/enrollment/kerberos.py:# ca/enrollment/kerberos.py
./ca/enrollment/kerberos.py:"""
./ca/enrollment/kerberos.py:Kerberos enrollment gate.
./ca/enrollment/kerberos.py:This is the ONLY place Kerberos is used.
./ca/enrollment/kerberos.py:Responsibilities:
./ca/enrollment/kerberos.py:- validate Kerberos identity (external system / ticket already obtained)
./ca/enrollment/kerberos.py:- authorize enrollment request
./ca/enrollment/kerberos.py:- produce a trusted enrollment context for CA pipeline
./ca/enrollment/kerberos.py:NOT used for:
./ca/enrollment/kerberos.py:- runtime authentication
./ca/enrollment/kerberos.py:- certificate validation
./ca/enrollment/kerberos.py:- LDAP policy decisions
./ca/enrollment/kerberos.py:"""
./ca/enrollment/kerberos.py:from dataclasses import dataclass
./ca/enrollment/kerberos.py:@dataclass
./ca/enrollment/kerberos.py:class KerberosContext:
./ca/enrollment/kerberos.py: principal: str
./ca/enrollment/kerberos.py: realm: str
./ca/enrollment/kerberos.py: authorized: bool
./ca/enrollment/kerberos.py:class KerberosGate:
./ca/enrollment/kerberos.py: """
./ca/enrollment/kerberos.py: Manual + external Kerberos verification boundary.
./ca/enrollment/kerberos.py: Assumption:
./ca/enrollment/kerberos.py: - Kerberos authentication already happened outside this system
./ca/enrollment/kerberos.py: - We only receive validated identity assertions
./ca/enrollment/kerberos.py: """
./ca/enrollment/kerberos.py: def validate(self, principal: str) -> KerberosContext:
./ca/enrollment/kerberos.py: """
./ca/enrollment/kerberos.py: Validate a Kerberos principal (placeholder for integration).
./ca/enrollment/kerberos.py: In real deployment this could be:
./ca/enrollment/kerberos.py: - kinit-provided assertion
./ca/enrollment/kerberos.py: - reverse proxy header (SPNEGO)
./ca/enrollment/kerberos.py: - signed enrollment token from Kerberos-aware gateway
./ca/enrollment/kerberos.py: """
./ca/enrollment/kerberos.py: if not principal or "@" not in principal:
./ca/enrollment/kerberos.py: raise ValueError("Invalid Kerberos principal format")
./ca/enrollment/kerberos.py: realm = principal.split("@")[1]
./ca/enrollment/kerberos.py: # NOTE: replace with real policy check later
./ca/enrollment/kerberos.py: authorized = True
./ca/enrollment/kerberos.py: return KerberosContext(
./ca/enrollment/kerberos.py: principal=principal,
./ca/enrollment/kerberos.py: realm=realm,
./ca/enrollment/kerberos.py: authorized=authorized,
./ca/enrollment/kerberos.py: )
./ca/enrollment/views.py:# ca/enrollment/views.py
./ca/enrollment/views.py:"""
./ca/enrollment/views.py:Enrollment API layer with explicit Kerberos gate.
./ca/enrollment/views.py:This is the ONLY place where Kerberos is enforced in request flow.
./ca/enrollment/views.py:Flow:
./ca/enrollment/views.py:1. Client provides hostname + CSR + Kerberos principal
./ca/enrollment/views.py:2. KerberosGate validates identity (external trust boundary)
./ca/enrollment/views.py:3. Host is checked/created in registry
./ca/enrollment/views.py:4. Certificate issuance is delegated to CA engine via issuance layer
./ca/enrollment/views.py:"""
./ca/enrollment/views.py:import json
./ca/enrollment/views.py:from django.http import JsonResponse, HttpResponseBadRequest
./ca/enrollment/views.py:from django.views.decorators.csrf import csrf_exempt
./ca/enrollment/views.py:from ca.issuance import issue_certificate
./ca/enrollment/views.py:from ca.registry import HostRegistry
./ca/enrollment/views.py:from ca.enrollment.kerberos import KerberosGate
./ca/enrollment/views.py:registry = HostRegistry()
./ca/enrollment/views.py:kerberos_gate = KerberosGate()
./ca/enrollment/views.py:def _parse(request):
./ca/enrollment/views.py: try:
./ca/enrollment/views.py: return json.loads(request.body.decode("utf-8"))
./ca/enrollment/views.py: except Exception:
./ca/enrollment/views.py: return None
./ca/enrollment/views.py:@csrf_exempt
./ca/enrollment/views.py:def enroll(request):
./ca/enrollment/views.py: """
./ca/enrollment/views.py: Step 1: Kerberos-authenticated enrollment initiation.
./ca/enrollment/views.py: Requires:
./ca/enrollment/views.py: - hostname
./ca/enrollment/views.py: - kerberos_principal
./ca/enrollment/views.py: """
./ca/enrollment/views.py: if request.method != "POST":
./ca/enrollment/views.py: return HttpResponseBadRequest("POST required")
./ca/enrollment/views.py: data = _parse(request)
./ca/enrollment/views.py: if not data:
./ca/enrollment/views.py: return HttpResponseBadRequest("invalid json")
./ca/enrollment/views.py: required = ["hostname", "kerberos_principal"]
./ca/enrollment/views.py: if any(k not in data for k in required):
./ca/enrollment/views.py: return HttpResponseBadRequest("missing fields")
./ca/enrollment/views.py: hostname = data["hostname"]
./ca/enrollment/views.py: principal = data["kerberos_principal"]
./ca/enrollment/views.py: # Kerberos is enforced ONLY here
./ca/enrollment/views.py: ctx = kerberos_gate.validate(principal)
./ca/enrollment/views.py: if not ctx.authorized:
./ca/enrollment/views.py: return JsonResponse({"error": "kerberos unauthorized"}, status=403)
./ca/enrollment/views.py: host = registry.get_host(hostname)
./ca/enrollment/views.py: if not host:
./ca/enrollment/views.py: return JsonResponse({"error": "unknown host"}, status=403)
./ca/enrollment/views.py: return JsonResponse(
./ca/enrollment/views.py: {
./ca/enrollment/views.py: "hostname": hostname,
./ca/enrollment/views.py: "realm": ctx.realm,
./ca/enrollment/views.py: "profiles": ["control-plane"],
./ca/enrollment/views.py: }
./ca/enrollment/views.py: )
./ca/enrollment/views.py:@csrf_exempt
./ca/enrollment/views.py:def issue(request):
./ca/enrollment/views.py: """
./ca/enrollment/views.py: Step 2: Certificate issuance (NO Kerberos here).
./ca/enrollment/views.py: Requires:
./ca/enrollment/views.py: - hostname
./ca/enrollment/views.py: - csr
./ca/enrollment/views.py: - profile
./ca/enrollment/views.py: """
./ca/enrollment/views.py: if request.method != "POST":
./ca/enrollment/views.py: return HttpResponseBadRequest("POST required")
./ca/enrollment/views.py: data = _parse(request)
./ca/enrollment/views.py: required = ["hostname", "csr", "profile"]
./ca/enrollment/views.py: if not data or any(k not in data for k in required):
./ca/enrollment/views.py: return HttpResponseBadRequest("missing fields")
./ca/enrollment/views.py: result = issue_certificate(
./ca/enrollment/views.py: hostname=data["hostname"],
./ca/enrollment/views.py: csr_pem=data["csr"],
./ca/enrollment/views.py: profile_name=data["profile"],
./ca/enrollment/views.py: )
./ca/enrollment/views.py: return JsonResponse(result)
./ca/acme/views.py:# ca/acme/views.py
./ca/acme/views.py:import json
./ca/acme/views.py:import uuid
./ca/acme/views.py:import datetime
./ca/acme/views.py:from django.http import JsonResponse, HttpResponse
./ca/acme/views.py:from django.views.decorators.csrf import csrf_exempt
./ca/acme/views.py:from ca.issuance import issue_certificate
./ca/acme/views.py:from ca.auth import authenticate_api_key
./ca/acme/views.py:# Very small in-memory ACME state (you can later move to DB or Redis)
./ca/acme/views.py:_NONCES = set()
./ca/acme/views.py:_ORDERS = {}
./ca/acme/views.py:def _require_api_key(request):
./ca/acme/views.py: api_key = request.headers.get("X-API-Key")
./ca/acme/views.py: if not api_key:
./ca/acme/views.py: return None
./ca/acme/views.py: return authenticate_api_key(api_key)
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def directory(request):
./ca/acme/views.py: return JsonResponse({
./ca/acme/views.py: "newNonce": "/acme/new-nonce",
./ca/acme/views.py: "newAccount": "/acme/new-account",
./ca/acme/views.py: "newOrder": "/acme/new-order",
./ca/acme/views.py: "finalize": "/acme/finalize",
./ca/acme/views.py: "certificate": "/acme/certificate",
./ca/acme/views.py: })
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def new_nonce(request):
./ca/acme/views.py: nonce = uuid.uuid4().hex
./ca/acme/views.py: _NONCES.add(nonce)
./ca/acme/views.py: resp = HttpResponse(status=204)
./ca/acme/views.py: resp["Replay-Nonce"] = nonce
./ca/acme/views.py: return resp
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def new_account(request):
./ca/acme/views.py: host = _require_api_key(request)
./ca/acme/views.py: if not host:
./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403)
./ca/acme/views.py: return JsonResponse({
./ca/acme/views.py: "status": "valid",
./ca/acme/views.py: "account": host.hostname,
./ca/acme/views.py: })
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def new_order(request):
./ca/acme/views.py: host = _require_api_key(request)
./ca/acme/views.py: if not host:
./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403)
./ca/acme/views.py: try:
./ca/acme/views.py: payload = json.loads(request.body.decode())
./ca/acme/views.py: except Exception:
./ca/acme/views.py: return JsonResponse({"error": "invalid json"}, status=400)
./ca/acme/views.py: identifiers = payload.get("identifiers", [])
./ca/acme/views.py: if not identifiers:
./ca/acme/views.py: return JsonResponse({"error": "no identifiers"}, status=400)
./ca/acme/views.py: order_id = uuid.uuid4().hex
./ca/acme/views.py: _ORDERS[order_id] = {
./ca/acme/views.py: "hostname": host.hostname,
./ca/acme/views.py: "identifiers": identifiers,
./ca/acme/views.py: "status": "ready",
./ca/acme/views.py: "created_at": datetime.datetime.utcnow(),
./ca/acme/views.py: }
./ca/acme/views.py: return JsonResponse({
./ca/acme/views.py: "order_id": order_id,
./ca/acme/views.py: "status": "ready",
./ca/acme/views.py: "identifiers": identifiers,
./ca/acme/views.py: "finalize": f"/acme/finalize/{order_id}",
./ca/acme/views.py: })
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def finalize_order(request, order_id):
./ca/acme/views.py: host = _require_api_key(request)
./ca/acme/views.py: if not host:
./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403)
./ca/acme/views.py: order = _ORDERS.get(order_id)
./ca/acme/views.py: if not order:
./ca/acme/views.py: return JsonResponse({"error": "order not found"}, status=404)
./ca/acme/views.py: try:
./ca/acme/views.py: payload = json.loads(request.body.decode())
./ca/acme/views.py: except Exception:
./ca/acme/views.py: return JsonResponse({"error": "invalid json"}, status=400)
./ca/acme/views.py: csr_pem = payload.get("csr")
./ca/acme/views.py: if not csr_pem:
./ca/acme/views.py: return JsonResponse({"error": "missing csr"}, status=400)
./ca/acme/views.py: identifiers = order["identifiers"]
./ca/acme/views.py: san_dns = [
./ca/acme/views.py: i["value"] for i in identifiers if i.get("type") == "dns"
./ca/acme/views.py: ]
./ca/acme/views.py: cert_result = issue_certificate(
./ca/acme/views.py: hostname=host.hostname,
./ca/acme/views.py: csr_pem=csr_pem,
./ca/acme/views.py: profile_name="acme",
./ca/acme/views.py: san_dns=san_dns,
./ca/acme/views.py: )
./ca/acme/views.py: order["status"] = "valid"
./ca/acme/views.py: order["certificate_serial"] = cert_result["serial_number"]
./ca/acme/views.py: return JsonResponse({
./ca/acme/views.py: "status": "valid",
./ca/acme/views.py: "certificate": cert_result["certificate"],
./ca/acme/views.py: "serial_number": cert_result["serial_number"],
./ca/acme/views.py: })
./ca/acme/views.py:@csrf_exempt
./ca/acme/views.py:def certificate(request, serial_number):
./ca/acme/views.py: host = _require_api_key(request)
./ca/acme/views.py: if not host:
./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403)
./ca/acme/views.py: return JsonResponse({
./ca/acme/views.py: "error": "use issuance record lookup (not implemented here yet)",
./ca/acme/views.py: "serial_number": serial_number,
./ca/acme/views.py: })
./ca/ca_engine.py:# ca/ca_engine.py
./ca/ca_engine.py:import datetime
./ca/ca_engine.py:import uuid
./ca/ca_engine.py:from dataclasses import dataclass
./ca/ca_engine.py:from typing import Dict, Any, Optional
./ca/ca_engine.py:from cryptography import x509
./ca/ca_engine.py:from cryptography.hazmat.primitives import hashes, serialization
./ca/ca_engine.py:from cryptography.hazmat.primitives.asymmetric import rsa
./ca/ca_engine.py:from cryptography.x509.oid import NameOID
./ca/ca_engine.py:from ca.models import CertificateProfile, IssuedCertificate
./ca/ca_engine.py:@dataclass
./ca/ca_engine.py:class IssuedBundle:
./ca/ca_engine.py: certificate_pem: str
./ca/ca_engine.py: ca_bundle_pem: str
./ca/ca_engine.py: serial_number: str
./ca/ca_engine.py: not_after: datetime.datetime
./ca/ca_engine.py:class CAEngine:
./ca/ca_engine.py: """
./ca/ca_engine.py: Core Certificate Authority engine.
./ca/ca_engine.py: Responsibilities:
./ca/ca_engine.py: - validate CSR against policy
./ca/ca_engine.py: - enforce profile constraints
./ca/ca_engine.py: - issue X.509 certificates
./ca/ca_engine.py: - record issuance in LDAP-backed store
./ca/ca_engine.py: """
./ca/ca_engine.py: def __init__(self, ca_private_key_pem: bytes, ca_cert_pem: bytes):
./ca/ca_engine.py: self.ca_private_key = serialization.load_pem_private_key(
./ca/ca_engine.py: ca_private_key_pem,
./ca/ca_engine.py: password=None,
./ca/ca_engine.py: )
./ca/ca_engine.py: self.ca_cert = x509.load_pem_x509_certificate(ca_cert_pem)
./ca/ca_engine.py: def _load_profile(self, profile_name: str) -> CertificateProfile:
./ca/ca_engine.py: return CertificateProfile.objects.get(name=profile_name)
./ca/ca_engine.py: def _validate_csr(self, csr: x509.CertificateSigningRequest, profile: CertificateProfile):
./ca/ca_engine.py: # Basic CSR validation
./ca/ca_engine.py: if not csr.is_signature_valid:
./ca/ca_engine.py: raise ValueError("Invalid CSR signature")
./ca/ca_engine.py: cn = None
./ca/ca_engine.py: for attr in csr.subject:
./ca/ca_engine.py: if attr.oid == NameOID.COMMON_NAME:
./ca/ca_engine.py: cn = attr.value
./ca/ca_engine.py: if profile.allowed_cn_regex and cn:
./ca/ca_engine.py: import re
./ca/ca_engine.py: if not re.match(profile.allowed_cn_regex, cn):
./ca/ca_engine.py: raise ValueError("CN does not match profile policy")
./ca/ca_engine.py: # SAN validation (if present)
./ca/ca_engine.py: try:
./ca/ca_engine.py: san = csr.extensions.get_extension_for_class(x509.SubjectAlternativeName)
./ca/ca_engine.py: san_values = san.value.get_values_for_type(x509.DNSName)
./ca/ca_engine.py: if profile.allowed_san_regex:
./ca/ca_engine.py: import re
./ca/ca_engine.py: for value in san_values:
./ca/ca_engine.py: if not re.match(profile.allowed_san_regex, value):
./ca/ca_engine.py: raise ValueError(f"SAN {value} not allowed by policy")
./ca/ca_engine.py: except x509.ExtensionNotFound:
./ca/ca_engine.py: pass
./ca/ca_engine.py: def _build_certificate(
./ca/ca_engine.py: self,
./ca/ca_engine.py: csr: x509.CertificateSigningRequest,
./ca/ca_engine.py: profile: CertificateProfile,
./ca/ca_engine.py: ) -> IssuedBundle:
./ca/ca_engine.py: serial_number = uuid.uuid4().hex
./ca/ca_engine.py: not_before = datetime.datetime.utcnow()
./ca/ca_engine.py: not_after = not_before + datetime.timedelta(days=profile.validity_days)
./ca/ca_engine.py: builder = (
./ca/ca_engine.py: x509.CertificateBuilder()
./ca/ca_engine.py: .subject_name(csr.subject)
./ca/ca_engine.py: .issuer_name(self.ca_cert.subject)
./ca/ca_engine.py: .public_key(csr.public_key())
./ca/ca_engine.py: .serial_number(int(serial_number[:16], 16))
./ca/ca_engine.py: .not_valid_before(not_before)
./ca/ca_engine.py: .not_valid_after(not_after)
./ca/ca_engine.py: )
./ca/ca_engine.py: # copy extensions from CSR if valid
./ca/ca_engine.py: for ext in csr.extensions:
./ca/ca_engine.py: builder = builder.add_extension(ext.value, ext.critical)
./ca/ca_engine.py: cert = builder.sign(
./ca/ca_engine.py: private_key=self.ca_private_key,
./ca/ca_engine.py: algorithm=hashes.SHA256(),
./ca/ca_engine.py: )
./ca/ca_engine.py: cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode()
./ca/ca_engine.py: ca_bundle_pem = self.ca_cert.public_bytes(
./ca/ca_engine.py: serialization.Encoding.PEM
./ca/ca_engine.py: ).decode()
./ca/ca_engine.py: return IssuedBundle(
./ca/ca_engine.py: certificate_pem=cert_pem,
./ca/ca_engine.py: ca_bundle_pem=ca_bundle_pem,
./ca/ca_engine.py: serial_number=serial_number,
./ca/ca_engine.py: not_after=not_after,
./ca/ca_engine.py: )
./ca/ca_engine.py: def issue_certificate(
./ca/ca_engine.py: self,
./ca/ca_engine.py: hostname: str,
./ca/ca_engine.py: csr_pem: str,
./ca/ca_engine.py: profile_name: str,
./ca/ca_engine.py: ) -> Dict[str, Any]:
./ca/ca_engine.py: profile = self._load_profile(profile_name)
./ca/ca_engine.py: csr = x509.load_pem_x509_csr(csr_pem.encode())
./ca/ca_engine.py: self._validate_csr(csr, profile)
./ca/ca_engine.py: bundle = self._build_certificate(csr, profile)
./ca/ca_engine.py: IssuedCertificate.objects.create(
./ca/ca_engine.py: serial_number=bundle.serial_number,
./ca/ca_engine.py: hostname=hostname,
./ca/ca_engine.py: profile_name=profile_name,
./ca/ca_engine.py: certificate_pem=bundle.certificate_pem,
./ca/ca_engine.py: ca_bundle_pem=bundle.ca_bundle_pem,
./ca/ca_engine.py: issued_at=str(datetime.datetime.utcnow()),
./ca/ca_engine.py: not_after=str(bundle.not_after),
./ca/ca_engine.py: last_csr=csr_pem,
./ca/ca_engine.py: revoked=False,
./ca/ca_engine.py: )
./ca/ca_engine.py: return {
./ca/ca_engine.py: "serial_number": bundle.serial_number,
./ca/ca_engine.py: "certificate": bundle.certificate_pem,
./ca/ca_engine.py: "ca_bundle": bundle.ca_bundle_pem,
./ca/ca_engine.py: "not_after": bundle.not_after.isoformat(),
./ca/ca_engine.py: }
./ca/policy.py:# ca/policy.py
./ca/policy.py:"""
./ca/policy.py:Policy engine for certificate issuance.
./ca/policy.py:This module is responsible for:
./ca/policy.py:- deciding whether a host may request a certificate
./ca/policy.py:- selecting valid certificate profiles
./ca/policy.py:- enforcing LDAP-driven constraints (via registry layer)
./ca/policy.py:It does NOT perform issuance or cryptography.
./ca/policy.py:"""
./ca/policy.py:import re
./ca/policy.py:from typing import List
./ca/policy.py:from ca.models import Host, CertificateProfile
./ca/policy.py:class PolicyEngine:
./ca/policy.py: """
./ca/policy.py: Central policy evaluation layer.
./ca/policy.py: """
./ca/policy.py: def get_host_profiles(self, host: Host) -> List[CertificateProfile]:
./ca/policy.py: """
./ca/policy.py: Returns certificate profiles allowed for a given host.
./ca/policy.py: """
./ca/policy.py: return list(CertificateProfile.objects.all())
./ca/policy.py: def is_host_allowed(self, host: Host) -> bool:
./ca/policy.py: """
./ca/policy.py: Basic allow/deny gate for enrollment eligibility.
./ca/policy.py: """
./ca/policy.py: if host is None:
./ca/policy.py: return False
./ca/policy.py: return True
./ca/policy.py: def validate_hostname(self, hostname: str) -> bool:
./ca/policy.py: """
./ca/policy.py: Minimal hostname sanity check (not DNS validation).
./ca/policy.py: """
./ca/policy.py: return bool(re.match(r"^[a-zA-Z0-9.-]+$", hostname))
./ca/policy.py: def filter_profiles_for_hostname(
./ca/policy.py: self,
./ca/policy.py: hostname: str,
./ca/policy.py: profiles: List[CertificateProfile],
./ca/policy.py: ) -> List[CertificateProfile]:
./ca/policy.py: """
./ca/policy.py: Filters profiles based on hostname policy constraints.
./ca/policy.py: """
./ca/policy.py: valid = []
./ca/policy.py: for profile in profiles:
./ca/policy.py: if profile.allowed_cn_regex:
./ca/policy.py: if not re.match(profile.allowed_cn_regex, hostname):
./ca/policy.py: continue
./ca/policy.py: valid.append(profile)
./ca/policy.py: return valid
./ca/notification_channel.py:# ca/notification_channel.py
./ca/notification_channel.py:"""
./ca/notification_channel.py:Notification channel for control-plane messages.
./ca/notification_channel.py:This is used to send signed UDP/DTLS-compatible payloads
./ca/notification_channel.py:to enrolled clients.
./ca/notification_channel.py:The CA does NOT maintain persistent connections.
./ca/notification_channel.py:It only emits stateless messages.
./ca/notification_channel.py:"""
./ca/notification_channel.py:import json
./ca/notification_channel.py:import socket
./ca/notification_channel.py:import time
./ca/notification_channel.py:import uuid
./ca/notification_channel.py:import hashlib
./ca/notification_channel.py:from typing import Dict, Any
./ca/notification_channel.py:class NotificationChannel:
./ca/notification_channel.py: """
./ca/notification_channel.py: Stateless UDP notification emitter.
./ca/notification_channel.py: """
./ca/notification_channel.py: def __init__(self, ca_private_key_pem: bytes):
./ca/notification_channel.py: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
./ca/notification_channel.py: self.ca_private_key_pem = ca_private_key_pem # reserved for future signing implementation
./ca/notification_channel.py: def _sign_payload(self, payload: Dict[str, Any]) -> str:
./ca/notification_channel.py: """
./ca/notification_channel.py: Lightweight placeholder signature mechanism.
./ca/notification_channel.py: NOTE:
./ca/notification_channel.py: This should be replaced with real asymmetric signing
./ca/notification_channel.py: (e.g. RSA/ECDSA using CA private key).
./ca/notification_channel.py: """
./ca/notification_channel.py: raw = json.dumps(payload, sort_keys=True).encode()
./ca/notification_channel.py: return hashlib.sha256(raw + self.ca_private_key_pem).hexdigest()
./ca/notification_channel.py: def send(self, host: str, port: int, event: str, data: Dict[str, Any]):
./ca/notification_channel.py: """
./ca/notification_channel.py: Send a signed control-plane event to a client.
./ca/notification_channel.py: """
./ca/notification_channel.py: payload = {
./ca/notification_channel.py: "id": str(uuid.uuid4()),
./ca/notification_channel.py: "timestamp": int(time.time()),
./ca/notification_channel.py: "event": event,
./ca/notification_channel.py: "data": data,
./ca/notification_channel.py: }
./ca/notification_channel.py: payload["signature"] = self._sign_payload(payload)
./ca/notification_channel.py: self.socket.sendto(
./ca/notification_channel.py: json.dumps(payload).encode(),
./ca/notification_channel.py: (host, port),
./ca/notification_channel.py: )
./ca/message_signing.py:# ca/message_signing.py
./ca/message_signing.py:"""
./ca/message_signing.py:Cryptographic message signing utilities for CA control-plane traffic.
./ca/message_signing.py:This module is the *real* replacement for any placeholder hashing
./ca/message_signing.py:done in notification_channel.py.
./ca/message_signing.py:It provides deterministic, verifiable signatures for:
./ca/message_signing.py:- UDP control-plane messages
./ca/message_signing.py:- internal CA events
./ca/message_signing.py:- audit messages (optional)
./ca/message_signing.py:"""
./ca/message_signing.py:import json
./ca/message_signing.py:from typing import Dict, Any
./ca/message_signing.py:from cryptography.hazmat.primitives import hashes, serialization
./ca/message_signing.py:from cryptography.hazmat.primitives.asymmetric import padding, rsa
./ca/message_signing.py:class MessageSigner:
./ca/message_signing.py: """
./ca/message_signing.py: RSA-based signing for CA-originated messages.
./ca/message_signing.py: """
./ca/message_signing.py: def __init__(self, private_key_pem: bytes):
./ca/message_signing.py: self.private_key = serialization.load_pem_private_key(
./ca/message_signing.py: private_key_pem,
./ca/message_signing.py: password=None,
./ca/message_signing.py: )
./ca/message_signing.py: def sign(self, payload: Dict[str, Any]) -> str:
./ca/message_signing.py: """
./ca/message_signing.py: Sign a JSON-serializable payload.
./ca/message_signing.py: """
./ca/message_signing.py: raw = json.dumps(payload, sort_keys=True).encode()
./ca/message_signing.py: signature = self.private_key.sign(
./ca/message_signing.py: raw,
./ca/message_signing.py: padding.PKCS1v15(),
./ca/message_signing.py: hashes.SHA256(),
./ca/message_signing.py: )
./ca/message_signing.py: return signature.hex()
./ca/message_signing.py: def verify(self, payload: Dict[str, Any], signature_hex: str) -> bool:
./ca/message_signing.py: """
./ca/message_signing.py: Verify a signature (useful for testing or internal validation).
./ca/message_signing.py: """
./ca/message_signing.py: raw = json.dumps(payload, sort_keys=True).encode()
./ca/message_signing.py: signature = bytes.fromhex(signature_hex)
./ca/message_signing.py: public_key = self.private_key.public_key()
./ca/message_signing.py: try:
./ca/message_signing.py: public_key.verify(
./ca/message_signing.py: signature,
./ca/message_signing.py: raw,
./ca/message_signing.py: padding.PKCS1v15(),
./ca/message_signing.py: hashes.SHA256(),
./ca/message_signing.py: )
./ca/message_signing.py: return True
./ca/message_signing.py: except Exception:
./ca/message_signing.py: return False
./ca/registry.py:# ca/registry.py
./ca/registry.py:"""
./ca/registry.py:Registry layer for CA system.
./ca/registry.py:Now fully aligned with django_ca.settings as the single
./ca/registry.py:source of truth for LDAP configuration.
./ca/registry.py:This module:
./ca/registry.py:- does NOT define LDAP structure
./ca/registry.py:- does NOT hardcode DN layout
./ca/registry.py:- only performs object access via django-ldapdb models
./ca/registry.py:"""
./ca/registry.py:from typing import List, Optional
./ca/registry.py:from django.conf import settings
./ca/registry.py:from ca.models import Host, IssuedCertificate, CertificateProfile
./ca/registry.py:class HostRegistry:
./ca/registry.py: def get_host(self, hostname: str) -> Optional[Host]:
./ca/registry.py: try:
./ca/registry.py: return Host.objects.get(cn=hostname)
./ca/registry.py: except Host.DoesNotExist:
./ca/registry.py: return None
./ca/registry.py: def list_hosts(self) -> List[Host]:
./ca/registry.py: return list(Host.objects.all())
./ca/registry.py: def enrolled(self) -> List[Host]:
./ca/registry.py: return list(Host.objects.filter(is_enrolled=True))
./ca/registry.py: def mark_enrolled(self, hostname: str) -> Host:
./ca/registry.py: host = self.get_host(hostname)
./ca/registry.py: if not host:
./ca/registry.py: raise ValueError(f"Unknown host: {hostname}")
./ca/registry.py: host.is_enrolled = True
./ca/registry.py: host.save()
./ca/registry.py: return host
./ca/registry.py:class CertificateRegistry:
./ca/registry.py: def get(self, serial_number: str) -> Optional[IssuedCertificate]:
./ca/registry.py: try:
./ca/registry.py: return IssuedCertificate.objects.get(serial_number=serial_number)
./ca/registry.py: except IssuedCertificate.DoesNotExist:
./ca/registry.py: return None
./ca/registry.py: def for_host(self, hostname: str) -> List[IssuedCertificate]:
./ca/registry.py: return list(IssuedCertificate.objects.filter(hostname=hostname))
./ca/registry.py: def active(self) -> List[IssuedCertificate]:
./ca/registry.py: return list(IssuedCertificate.objects.filter(revoked=False))
./ca/registry.py: def revoked(self) -> List[IssuedCertificate]:
./ca/registry.py: return list(IssuedCertificate.objects.filter(revoked=True))
./ca/registry.py: def revoke(self, serial_number: str) -> IssuedCertificate:
./ca/registry.py: cert = self.get(serial_number)
./ca/registry.py: if not cert:
./ca/registry.py: raise ValueError(f"Unknown certificate: {serial_number}")
./ca/registry.py: cert.revoked = True
./ca/registry.py: cert.save()
./ca/registry.py: return cert
./ca/registry.py:class ProfileRegistry:
./ca/registry.py: def get(self, name: str) -> Optional[CertificateProfile]:
./ca/registry.py: try:
./ca/registry.py: return CertificateProfile.objects.get(name=name)
./ca/registry.py: except CertificateProfile.DoesNotExist:
./ca/registry.py: return None
./ca/registry.py: def list(self) -> List[CertificateProfile]:
./ca/registry.py: return list(CertificateProfile.objects.all())
./ca/registry.py: def list_all(self) -> List[CertificateProfile]:
./ca/registry.py: """
./ca/registry.py: Alias for compatibility with policy/ACME layers.
./ca/registry.py: """
./ca/registry.py: return self.list()
./ca/renew.py:# ca/renew.py
./ca/renew.py:import time
./ca/renew.py:import json
./ca/renew.py:from datetime import datetime, timedelta
./ca/renew.py:from ca.models import IssuedCertificate
./ca/renew.py:from ca.issuance import issue_certificate
./ca/renew.py:RENEWAL_THRESHOLD_DAYS = 10
./ca/renew.py:def needs_renewal(cert: IssuedCertificate) -> bool:
./ca/renew.py: """
./ca/renew.py: Determines whether a certificate is close enough to expiry to require renewal.
./ca/renew.py: """
./ca/renew.py: if not cert.not_after:
./ca/renew.py: return False
./ca/renew.py: return cert.not_after <= datetime.utcnow() + timedelta(days=RENEWAL_THRESHOLD_DAYS)
./ca/renew.py:def build_renewal_csr(cert: IssuedCertificate) -> str:
./ca/renew.py: """
./ca/renew.py: Placeholder: regenerate CSR based on stored metadata.
./ca/renew.py: In a real implementation this would:
./ca/renew.py: - fetch host key material (or request regeneration)
./ca/renew.py: - rebuild CSR using same subject + SAN profile
./ca/renew.py: """
./ca/renew.py: return cert.last_csr
./ca/renew.py:def renew_certificate(cert: IssuedCertificate):
./ca/renew.py: """
./ca/renew.py: Performs certificate renewal using existing issuance pipeline.
./ca/renew.py: """
./ca/renew.py: csr = build_renewal_csr(cert)
./ca/renew.py: return issue_certificate(
./ca/renew.py: hostname=cert.hostname,
./ca/renew.py: csr_pem=csr,
./ca/renew.py: profile_name=cert.profile_name,
./ca/renew.py: )
./ca/renew.py:def run_renewal_cycle():
./ca/renew.py: """
./ca/renew.py: Scans all issued certificates and renews those nearing expiry.
./ca/renew.py: """
./ca/renew.py: now = datetime.utcnow()
./ca/renew.py: certs = IssuedCertificate.objects.all()
./ca/renew.py: renewed = 0
./ca/renew.py: for cert in certs:
./ca/renew.py: if cert.not_after and cert.not_after <= now + timedelta(days=RENEWAL_THRESHOLD_DAYS):
./ca/renew.py: try:
./ca/renew.py: renew_certificate(cert)
./ca/renew.py: renewed += 1
./ca/renew.py: except Exception:
./ca/renew.py: continue
./ca/renew.py: return {"renewed": renewed, "timestamp": time.time()}
./ca/renew.py:def main():
./ca/renew.py: result = run_renewal_cycle()
./ca/renew.py: print(json.dumps(result, indent=2))
./ca/renew.py:if __name__ == "__main__":
./ca/renew.py: main()
./ca/__init__.py:# ca/__init__.py
./ca/__init__.py:"""
./ca/__init__.py:CA application package initializer.
./ca/__init__.py:This module exposes a minimal public API for the CA system
./ca/__init__.py:while keeping internal modules isolated.
./ca/__init__.py:Public surface:
./ca/__init__.py:- issue_certificate_task (Celery entrypoint)
./ca/__init__.py:- CAEngine (core cryptographic engine)
./ca/__init__.py:- registry access helpers (optional)
./ca/__init__.py:"""
./ca/__init__.py:from ca.ca_engine import CAEngine
./ca/__init__.py:from ca.issuance import issue_certificate
./ca/__init__.py:from ca.registry import HostRegistry, CertificateRegistry
./ca/__init__.py:__all__ = [
./ca/__init__.py: "CAEngine",
./ca/__init__.py: "issue_certificate",
./ca/__init__.py: "HostRegistry",
./ca/__init__.py: "CertificateRegistry",
./ca/__init__.py:]
./client/csr.py:# client/csr.py
./client/csr.py:"""
./client/csr.py:Client-side cryptographic utilities.
./client/csr.py:Responsible for:
./client/csr.py:- generating private keys
./client/csr.py:- building X.509 CSRs
./client/csr.py:- embedding hostname + SANs in a policy-compatible way
./client/csr.py:This module must remain CA-agnostic:
./client/csr.py:it does NOT know about LDAP, Django, or issuance logic.
./client/csr.py:"""
./client/csr.py:import socket
./client/csr.py:from typing import Optional, List
./client/csr.py:from cryptography import x509
./client/csr.py:from cryptography.x509.oid import NameOID
./client/csr.py:from cryptography.hazmat.primitives import hashes, serialization
./client/csr.py:from cryptography.hazmat.primitives.asymmetric import rsa
./client/csr.py:def generate_private_key(key_size: int = 2048) -> rsa.RSAPrivateKey:
./client/csr.py: """
./client/csr.py: Generate RSA private key locally on the client.
./client/csr.py: """
./client/csr.py: return rsa.generate_private_key(public_exponent=65537, key_size=key_size)
./client/csr.py:def _default_hostname() -> str:
./client/csr.py: return socket.getfqdn()
./client/csr.py:def generate_csr(
./client/csr.py: private_key: rsa.RSAPrivateKey,
./client/csr.py: hostname: Optional[str] = None,
./client/csr.py: san_dns_names: Optional[List[str]] = None,
./client/csr.py:) -> bytes:
./client/csr.py: """
./client/csr.py: Generate a PEM-encoded CSR.
./client/csr.py: The CSR includes:
./client/csr.py: - Common Name (CN)
./client/csr.py: - Subject Alternative Names (SAN)
./client/csr.py: """
./client/csr.py: hostname = hostname or _default_hostname()
./client/csr.py: san_dns_names = san_dns_names or [hostname]
./client/csr.py: subject = x509.Name(
./client/csr.py: [
./client/csr.py: x509.NameAttribute(NameOID.COMMON_NAME, hostname),
./client/csr.py: ]
./client/csr.py: )
./client/csr.py: csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject)
./client/csr.py: san = x509.SubjectAlternativeName(
./client/csr.py: [x509.DNSName(name) for name in san_dns_names]
./client/csr.py: )
./client/csr.py: csr_builder = csr_builder.add_extension(san, critical=False)
./client/csr.py: csr = csr_builder.sign(
./client/csr.py: private_key,
./client/csr.py: hashes.SHA256(),
./client/csr.py: )
./client/csr.py: return csr.public_bytes(serialization.Encoding.PEM)
./client/csr.py:def serialize_private_key(private_key: rsa.RSAPrivateKey) -> bytes:
./client/csr.py: """
./client/csr.py: Serialize private key in PEM format (unencrypted).
./client/csr.py: """
./client/csr.py: return private_key.private_bytes(
./client/csr.py: encoding=serialization.Encoding.PEM,
./client/csr.py: format=serialization.PrivateFormat.TraditionalOpenSSL,
./client/csr.py: encryption_algorithm=serialization.NoEncryption(),
./client/csr.py: )
./client/agent.py:# client/agent.py
./client/agent.py:import socket
./client/agent.py:import json
./client/agent.py:import time
./client/agent.py:import os
./client/agent.py:import ssl
./client/agent.py:from pathlib import Path
./client/agent.py:from typing import Dict, Any
./client/agent.py:from cryptography.hazmat.primitives import hashes
./client/agent.py:from cryptography.hazmat.primitives.asymmetric import padding
./client/agent.py:from cryptography.x509 import load_pem_x509_certificate
./client/agent.py:DEFAULT_BIND = ("0.0.0.0", 9999)
./client/agent.py:DEFAULT_CA_CERT_PATH = Path("/etc/ssl/ca/ca.pem")
./client/agent.py:DEFAULT_CLIENT_CERT_PATH = Path("/etc/ssl/client/client.pem")
./client/agent.py:DEFAULT_CLIENT_KEY_PATH = Path("/etc/ssl/client/client.key")
./client/agent.py:class ControlPlaneAgent:
./client/agent.py: """
./client/agent.py: UDP-based control-plane agent.
./client/agent.py: Responsibilities:
./client/agent.py: - Receive signed messages from CA server
./client/agent.py: - Validate signature using CA certificate
./client/agent.py: - Enforce replay protection (nonce + timestamp)
./client/agent.py: - Execute allowed event handlers
./client/agent.py: """
./client/agent.py: def __init__(self, bind_addr=DEFAULT_BIND):
./client/agent.py: self.bind_addr = bind_addr
./client/agent.py: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
./client/agent.py: self.socket.bind(self.bind_addr)
./client/agent.py: self.ca_cert = self._load_ca_cert()
./client/agent.py: self._seen_nonces = set()
./client/agent.py: def _load_ca_cert(self):
./client/agent.py: if not DEFAULT_CA_CERT_PATH.exists():
./client/agent.py: raise RuntimeError("CA certificate not found at /etc/ssl/ca/ca.pem")
./client/agent.py: return load_pem_x509_certificate(DEFAULT_CA_CERT_PATH.read_bytes())
./client/agent.py: def _verify_signature(self, payload: Dict[str, Any]) -> bool:
./client/agent.py: signature = bytes.fromhex(payload["signature"])
./client/agent.py: unsigned = json.dumps(
./client/agent.py: {k: payload[k] for k in payload if k != "signature"},
./client/agent.py: sort_keys=True
./client/agent.py: ).encode()
./client/agent.py: pubkey = self.ca_cert.public_key()
./client/agent.py: try:
./client/agent.py: pubkey.verify(
./client/agent.py: signature,
./client/agent.py: unsigned,
./client/agent.py: padding.PKCS1v15(),
./client/agent.py: hashes.SHA256(),
./client/agent.py: )
./client/agent.py: return True
./client/agent.py: except Exception:
./client/agent.py: return False
./client/agent.py: def _check_replay(self, payload: Dict[str, Any]) -> bool:
./client/agent.py: nonce = payload.get("nonce")
./client/agent.py: ts = payload.get("timestamp", 0)
./client/agent.py: if nonce in self._seen_nonces:
./client/agent.py: return False
./client/agent.py: now = int(time.time())
./client/agent.py: if abs(now - ts) > payload.get("ttl", 30):
./client/agent.py: return False
./client/agent.py: self._seen_nonces.add(nonce)
./client/agent.py: return True
./client/agent.py: def _handle_event(self, payload: Dict[str, Any]):
./client/agent.py: event = payload.get("event")
./client/agent.py: if event == "reboot":
./client/agent.py: os.system("reboot")
./client/agent.py: elif event == "shutdown":
./client/agent.py: os.system("shutdown now")
./client/agent.py: elif event == "noop":
./client/agent.py: pass
./client/agent.py: else:
./client/agent.py: # Unknown events are ignored for safety
./client/agent.py: pass
./client/agent.py: def run(self):
./client/agent.py: print(f"Agent listening on {self.bind_addr}")
./client/agent.py: while True:
./client/agent.py: data, addr = self.socket.recvfrom(65535)
./client/agent.py: try:
./client/agent.py: payload = json.loads(data.decode())
./client/agent.py: except Exception:
./client/agent.py: continue
./client/agent.py: if not self._check_replay(payload):
./client/agent.py: continue
./client/agent.py: if not self._verify_signature(payload):
./client/agent.py: continue
./client/agent.py: self._handle_event(payload)
./client/agent.py:def main():
./client/agent.py: agent = ControlPlaneAgent()
./client/agent.py: agent.run()
./client/agent.py:if __name__ == "__main__":
./client/agent.py: main()
./client/enroll.py:# enroll.py
./client/enroll.py:"""
./client/enroll.py:Client enrollment bootstrap agent.
./client/enroll.py:This script performs ONE privileged action:
./client/enroll.py:- initiate Kerberos-backed enrollment
./client/enroll.py:- obtain allowed certificate profiles from CA
./client/enroll.py:- generate private key locally
./client/enroll.py:- generate CSR
./client/enroll.py:- request certificate issuance
./client/enroll.py:- install certificates into /etc/ssl
./client/enroll.py:After this runs successfully:
./client/enroll.py:- Kerberos is no longer used
./client/enroll.py:- all further auth is certificate-based
./client/enroll.py:"""
./client/enroll.py:import json
./client/enroll.py:import socket
./client/enroll.py:from pathlib import Path
./client/enroll.py:import requests
./client/enroll.py:from client.csr import generate_csr, generate_private_key
./client/enroll.py:CA_URL = "https://ca.local:8443"
./client/enroll.py:def auto_hostname() -> str:
./client/enroll.py: """
./client/enroll.py: Best-effort system hostname detection.
./client/enroll.py: """
./client/enroll.py: return socket.getfqdn()
./client/enroll.py:def kerberos_principal() -> str:
./client/enroll.py: """
./client/enroll.py: In real deployment this comes from:
./client/enroll.py: - kinit session cache
./client/enroll.py: - or external SSO tool
./client/enroll.py: """
./client/enroll.py: raise NotImplementedError("Kerberos integration required")
./client/enroll.py:def enroll():
./client/enroll.py: hostname = auto_hostname()
./client/enroll.py: principal = kerberos_principal()
./client/enroll.py: # Step 1: initiate enrollment (Kerberos gate)
./client/enroll.py: resp = requests.post(
./client/enroll.py: f"{CA_URL}/enroll",
./client/enroll.py: json={
./client/enroll.py: "hostname": hostname,
./client/enroll.py: "kerberos_principal": principal,
./client/enroll.py: },
./client/enroll.py: timeout=10,
./client/enroll.py: )
./client/enroll.py: resp.raise_for_status()
./client/enroll.py: data = resp.json()
./client/enroll.py: profiles = data["profiles"]
./client/enroll.py: if not profiles:
./client/enroll.py: raise RuntimeError("No certificate profiles available")
./client/enroll.py: # Always enforce control-plane cert existence
./client/enroll.py: profile = "control-plane" if "control-plane" in profiles else profiles[0]
./client/enroll.py: # Step 2: generate key + CSR locally
./client/enroll.py: key = generate_private_key()
./client/enroll.py: csr = generate_csr(key, hostname)
./client/enroll.py: key_pem = key.private_bytes(
./client/enroll.py: encoding=None,
./client/enroll.py: format=None,
./client/enroll.py: encryption_algorithm=None,
./client/enroll.py: )
./client/enroll.py: # Step 3: request certificate
./client/enroll.py: resp = requests.post(
./client/enroll.py: f"{CA_URL}/issue",
./client/enroll.py: json={
./client/enroll.py: "hostname": hostname,
./client/enroll.py: "csr": csr.decode(),
./client/enroll.py: "profile": profile,
./client/enroll.py: },
./client/enroll.py: timeout=20,
./client/enroll.py: )
./client/enroll.py: resp.raise_for_status()
./client/enroll.py: result = resp.json()
./client/enroll.py: # Step 4: install certs
./client/enroll.py: ssl_dir = Path("/etc/ssl/ca")
./client/enroll.py: ssl_dir.mkdir(parents=True, exist_ok=True)
./client/enroll.py: (ssl_dir / "client.key").write_bytes(key_pem.encode() if hasattr(key_pem, "encode") else key_pem)
./client/enroll.py: (ssl_dir / "client.crt").write_text(result["certificate"])
./client/enroll.py: (ssl_dir / "ca.crt").write_text(result["ca_bundle"])
./client/enroll.py: print("Enrollment complete:", hostname)
./client/enroll.py:if __name__ == "__main__":
./client/enroll.py: enroll()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment