Last active
May 25, 2026 20:20
-
-
Save paigeadelethompson/ce7b4101a8f54ed0466f4cf638707dd7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ./README.txt.old:Django LDAP Certificate Authority + Control Plane System | |
| ./README.txt.old:======================================================== | |
| ./README.txt.old:This system is an internal infrastructure platform built using Django that combines a private Certificate Authority (PKI), LDAP-based policy enforcement, Kerberos bootstrap enrollment, asynchronous certificate issuance via Celery, and a certificate-based control-plane messaging system over UDP. | |
| ./README.txt.old:It is designed for environments where machines are fully managed and centrally controlled. Identity is not user-based or credential-based — it is entirely certificate-based. | |
| ./README.txt.old:After a machine is enrolled, it is no longer identified by Kerberos or any API key system. Instead, its identity becomes a set of X.509 certificates issued by the internal CA, with a mandatory control-plane certificate acting as the primary identity for all future communication. | |
| ./README.txt.old:LDAP defines what a machine is allowed to become. The CA enforces those rules at issuance time. Clients are responsible for generating their own private keys locally and submitting CSRs for approval. | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:1. REPOSITORY FILE TREE | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:ca_project/ | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- pyproject.toml | |
| ./README.txt.old:| - Python packaging configuration | |
| ./README.txt.old:| - defines dependencies and entry points for client/server tools | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- README.md | |
| ./README.txt.old:| - system documentation and operational overview | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- manage.py | |
| ./README.txt.old:| - Django management entrypoint (admin, migrations, shell, etc.) | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- config.py | |
| ./README.txt.old:| - global configuration layer outside Django settings | |
| ./README.txt.old:| - used for CA behavior, notification routing, and runtime flags | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- django_ca/ | |
| ./README.txt.old:| |-- settings.py | |
| ./README.txt.old:| | - Django configuration (installed apps, database, auth backends) | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- urls.py | |
| ./README.txt.old:| | - HTTP API routing | |
| ./README.txt.old:| | - includes enrollment, issuance, and control-plane endpoints | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- wsgi.py | |
| ./README.txt.old:| |-- asgi.py | |
| ./README.txt.old:| | - deployment entrypoints for WSGI/ASGI servers | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- celery.py | |
| ./README.txt.old:| - Celery application configuration | |
| ./README.txt.old:| - defines broker, backend, and task discovery | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- ca/ | |
| ./README.txt.old:| |-- apps.py | |
| ./README.txt.old:| | - Django app registration | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- admin.py | |
| ./README.txt.old:| | - admin interface for CA management objects | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- models.py | |
| ./README.txt.old:| | - Host: | |
| ./README.txt.old:| | represents an enrolled machine in the system | |
| ./README.txt.old:| | stores identity bindings, status, and metadata | |
| ./README.txt.old:| | | | |
| ./README.txt.old:| | - CertificateProfile: | |
| ./README.txt.old:| | defines allowed certificate types | |
| ./README.txt.old:| | maps LDAP filters → allowed SAN/CN constraints | |
| ./README.txt.old:| | | | |
| ./README.txt.old:| | - IssuedCertificate: | |
| ./README.txt.old:| | stores issued certificates and lifecycle state | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- auth.py | |
| ./README.txt.old:| | - authentication backend for Django | |
| ./README.txt.old:| | - validates client identity using control-plane certificates | |
| ./README.txt.old:| | - replaces all API-key or password-based authentication | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- ldap.py | |
| ./README.txt.old:| | - LDAP client layer | |
| ./README.txt.old:| | - used exclusively for querying policy and host eligibility | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- policy.py | |
| ./README.txt.old:| | - CSR validation engine | |
| ./README.txt.old:| | - enforces LDAP-derived rules on: | |
| ./README.txt.old:| | - CN (Common Name) | |
| ./README.txt.old:| | - SAN (Subject Alternative Names) | |
| ./README.txt.old:| | - certificate profiles | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- registry.py | |
| ./README.txt.old:| | - host registry and identity binding system | |
| ./README.txt.old:| | - maps certificate identity → network identity (IP/port) | |
| ./README.txt.old:| | - tracks enrollment state and last-known connectivity | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- ca_engine.py | |
| ./README.txt.old:| | - core Certificate Authority logic | |
| ./README.txt.old:| | - loads CA private key and CA certificate | |
| ./README.txt.old:| | - performs X.509 signing operations | |
| ./README.txt.old:| | - enforces cryptographic constraints (EKU, KU, validity) | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- issuance.py | |
| ./README.txt.old:| | - primary certificate issuance workflow | |
| ./README.txt.old:| | - validates CSR against policy | |
| ./README.txt.old:| | - checks LDAP authorization rules | |
| ./README.txt.old:| | - calls CA engine to sign certificates | |
| ./README.txt.old:| | - writes IssuedCertificate records | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- tasks.py | |
| ./README.txt.old:| | - Celery async task wrapper layer | |
| ./README.txt.old:| | - provides issue_certificate_task() | |
| ./README.txt.old:| | - delegates actual logic to issuance.py | |
| ./README.txt.old:| | - enables retries and background processing | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- renew.py | |
| ./README.txt.old:| | - certificate lifecycle management | |
| ./README.txt.old:| | - handles renewal and re-issuance before expiration | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- message_signing.py | |
| ./README.txt.old:| | - cryptographic signing of control-plane messages | |
| ./README.txt.old:| | - ensures integrity + authenticity of server commands | |
| ./README.txt.old:| | - includes timestamp and nonce-based replay protection | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- notification_channel.py | |
| ./README.txt.old:| | - UDP transport layer for control-plane messaging | |
| ./README.txt.old:| | - resolves host identity via registry | |
| ./README.txt.old:| | - sends signed operational messages to clients | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- enrollment/ | |
| ./README.txt.old:| | |-- views.py | |
| ./README.txt.old:| | | - POST /enroll endpoint | |
| ./README.txt.old:| | | - entrypoint for machine bootstrap | |
| ./README.txt.old:| | | - returns certificate plan (not certificates) | |
| ./README.txt.old:| | | | |
| ./README.txt.old:| | |-- kerberos.py | |
| ./README.txt.old:| | | - Kerberos authentication validation layer | |
| ./README.txt.old:| | | - used only during initial enrollment bootstrap | |
| ./README.txt.old:| | | | |
| ./README.txt.old:| | |-- profiles.py | |
| ./README.txt.old:| | - builds certificate plans from LDAP policy | |
| ./README.txt.old:| | - determines required certificates per host | |
| ./README.txt.old:| | - injects mandatory control-plane certificate requirement | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- client/ | |
| ./README.txt.old:| |-- csr.py | |
| ./README.txt.old:| | - local key generation (always client-side) | |
| ./README.txt.old:| | - CSR construction based on server-issued plan | |
| ./README.txt.old:| | - ensures private keys never leave machine | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- agent.py | |
| ./README.txt.old:| | - long-running UDP listener daemon | |
| ./README.txt.old:| | - receives control-plane messages from server | |
| ./README.txt.old:| | - verifies signature, timestamp, nonce (replay protection) | |
| ./README.txt.old:| | - dispatches system actions safely | |
| ./README.txt.old:| | | |
| ./README.txt.old:| |-- install.py | |
| ./README.txt.old:| - installs certificates into /etc/ssl directories | |
| ./README.txt.old:| - configures system trust and identity usage | |
| ./README.txt.old:| | |
| ./README.txt.old:|-- client_notification_agent.py | |
| ./README.txt.old:| - legacy client runtime entrypoint | |
| ./README.txt.old:| - overlaps with client/agent.py | |
| ./README.txt.old:| - should be migrated into client/ directory for consistency | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:2. ARCHITECTURE OVERVIEW | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:The system is composed of four tightly connected layers: | |
| ./README.txt.old:Identity Bootstrap Layer | |
| ./README.txt.old:- Responsible for initial machine onboarding | |
| ./README.txt.old:- Uses Kerberos authentication as the first trust mechanism | |
| ./README.txt.old:- Ensures only authorized machines can request enrollment | |
| ./README.txt.old:Certificate Authority Layer | |
| ./README.txt.old:- Responsible for issuing all X.509 certificates | |
| ./README.txt.old:- Enforces LDAP-based policy rules during issuance | |
| ./README.txt.old:- Validates and signs CSRs using CA private key | |
| ./README.txt.old:- Maintains full certificate lifecycle tracking | |
| ./README.txt.old:Identity and Registry Layer | |
| ./README.txt.old:- Maintains persistent mapping of machines | |
| ./README.txt.old:- Links certificate identity to network identity (IP/port) | |
| ./README.txt.old:- Tracks enrollment state and operational status | |
| ./README.txt.old:Control-Plane Layer | |
| ./README.txt.old:- Provides secure server-to-client messaging system | |
| ./README.txt.old:- Uses UDP as transport (no persistent connection required) | |
| ./README.txt.old:- Ensures authenticity via cryptographic signing | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:3. TRUST MODEL | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:LDAP (Policy Authority) | |
| ./README.txt.old:- Defines what machines are allowed to exist | |
| ./README.txt.old:- Defines which certificate profiles can be issued | |
| ./README.txt.old:- Controls allowed CN/SAN structures | |
| ./README.txt.old:- Does not store certificates or runtime state | |
| ./README.txt.old:Kerberos (Bootstrap Only) | |
| ./README.txt.old:- Used only for initial enrollment authentication | |
| ./README.txt.old:- Provides first proof of machine identity | |
| ./README.txt.old:- Completely replaced after certificate issuance begins | |
| ./README.txt.old:X.509 Certificates (Primary Identity System) | |
| ./README.txt.old:- Every machine is identified through certificates | |
| ./README.txt.old:- Control-plane certificate becomes the primary identity credential | |
| ./README.txt.old:- Used for authentication, authorization, and messaging | |
| ./README.txt.old:CA Private Key (Root Trust Anchor) | |
| ./README.txt.old:- Used only within CA engine | |
| ./README.txt.old:- Signs all certificates | |
| ./README.txt.old:- Never leaves the CA system boundary | |
| ./README.txt.old:Client Private Keys | |
| ./README.txt.old:- Generated locally on each machine | |
| ./README.txt.old:- Never transmitted or exposed to server | |
| ./README.txt.old:- Stored under secure system directories | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:4. ENROLLMENT FLOW | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:Step 1: Kerberos Bootstrap Authentication | |
| ./README.txt.old:- Machine authenticates using Kerberos credentials | |
| ./README.txt.old:- Establishes initial trust relationship with CA system | |
| ./README.txt.old:Step 2: Enrollment Request | |
| ./README.txt.old:- Client calls POST /enroll endpoint | |
| ./README.txt.old:- Request includes Kerberos identity context | |
| ./README.txt.old:Step 3: Server Identity Validation | |
| ./README.txt.old:- Server validates Kerberos identity via kerberos.py | |
| ./README.txt.old:- Extracts machine identity and hostname | |
| ./README.txt.old:Step 4: Host Registration | |
| ./README.txt.old:- Host is created in registry | |
| ./README.txt.old:- IP address and metadata recorded | |
| ./README.txt.old:- Machine marked as “known but not provisioned” | |
| ./README.txt.old:Step 5: LDAP Policy Evaluation | |
| ./README.txt.old:- LDAP queried for machine eligibility | |
| ./README.txt.old:- Certificate profile selection is determined | |
| ./README.txt.old:Step 6: Certificate Plan Generation | |
| ./README.txt.old:- profiles.py generates full certificate plan | |
| ./README.txt.old:- Includes: | |
| ./README.txt.old: - required control-plane certificate (mandatory) | |
| ./README.txt.old: - optional service certificates (policy-based) | |
| ./README.txt.old: - SAN/CN constraints | |
| ./README.txt.old: - validity rules | |
| ./README.txt.old:Step 7: Response to Client | |
| ./README.txt.old:- Client receives certificate plan only | |
| ./README.txt.old:- No certificates are issued at this stage | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:5. CERTIFICATE ISSUANCE FLOW | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:Step 1: Client Key Generation | |
| ./README.txt.old:- Private key is generated locally on client machine | |
| ./README.txt.old:- Key never leaves system boundary | |
| ./README.txt.old:Step 2: CSR Generation | |
| ./README.txt.old:- CSR is built using server-provided certificate plan | |
| ./README.txt.old:- Includes CN, SAN, and profile metadata | |
| ./README.txt.old:Step 3: CSR Submission | |
| ./README.txt.old:- CSR submitted to issuance endpoint | |
| ./README.txt.old:- Authentication occurs via certificate identity after bootstrap | |
| ./README.txt.old:Step 4: Celery Task Dispatch | |
| ./README.txt.old:- tasks.py queues issuance asynchronously | |
| ./README.txt.old:- allows retry and non-blocking processing | |
| ./README.txt.old:Step 5: Issuance Validation | |
| ./README.txt.old:- issuance.py validates CSR | |
| ./README.txt.old:- applies LDAP policy constraints | |
| ./README.txt.old:- ensures CSR matches certificate plan | |
| ./README.txt.old:Step 6: Certificate Signing | |
| ./README.txt.old:- ca_engine.py signs CSR using CA private key | |
| ./README.txt.old:- produces valid X.509 certificate | |
| ./README.txt.old:Step 7: Persistence | |
| ./README.txt.old:- IssuedCertificate stored in database | |
| ./README.txt.old:Step 8: Client Installation | |
| ./README.txt.old:- client/install.py installs certificate into system trust stores | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:6. CELERY ISSUANCE LAYER | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:Celery is used to decouple certificate issuance from HTTP request handling. | |
| ./README.txt.old:ca/tasks.py: | |
| ./README.txt.old:- provides issue_certificate_task() | |
| ./README.txt.old:- wraps issuance.issue_certificate() | |
| ./README.txt.old:- enables retries and background execution | |
| ./README.txt.old:- prevents CA bottlenecks under load | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:7. CONTROL-PLANE SYSTEM | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:The control-plane enables server-to-client operational messaging. | |
| ./README.txt.old:Server Side: | |
| ./README.txt.old:- registry resolves host → network address | |
| ./README.txt.old:- notification_channel.py sends UDP packets | |
| ./README.txt.old:- message_signing.py ensures message integrity and authenticity | |
| ./README.txt.old:Client Side: | |
| ./README.txt.old:- agent.py listens on UDP socket | |
| ./README.txt.old:- verifies signature, nonce, timestamp | |
| ./README.txt.old:- rejects replayed or invalid messages | |
| ./README.txt.old:- executes authorized system actions | |
| ./README.txt.old:No persistent connection is required. Communication is event-based. | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:8. MESSAGE FORMAT | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:Control-plane messages are structured as: | |
| ./README.txt.old:{ | |
| ./README.txt.old: "event": "reboot", | |
| ./README.txt.old: "payload": { | |
| ./README.txt.old: "reason": "kernel update required" | |
| ./README.txt.old: }, | |
| ./README.txt.old: "timestamp": 1710000000, | |
| ./README.txt.old: "nonce": "uuid", | |
| ./README.txt.old: "ttl": 30, | |
| ./README.txt.old: "signature": "hex-encoded-signature" | |
| ./README.txt.old:} | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:9. LDAP POLICY SYSTEM | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:LDAP acts as the authoritative policy layer. | |
| ./README.txt.old:It defines: | |
| ./README.txt.old:- which machines are allowed to enroll | |
| ./README.txt.old:- which certificate profiles are valid | |
| ./README.txt.old:- which SAN and CN values are permitted | |
| ./README.txt.old:LDAP is not used for: | |
| ./README.txt.old:- certificate storage | |
| ./README.txt.old:- runtime communication | |
| ./README.txt.old:- identity authentication after enrollment | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:10. HOST REGISTRY | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:The registry tracks machine identity and network state. | |
| ./README.txt.old:It stores: | |
| ./README.txt.old:- hostname | |
| ./README.txt.old:- certificate identity binding | |
| ./README.txt.old:- IP address | |
| ./README.txt.old:- UDP port | |
| ./README.txt.old:- enrollment status | |
| ./README.txt.old:It is used for: | |
| ./README.txt.old:- routing control-plane messages | |
| ./README.txt.old:- resolving machine identity at runtime | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:11. SECURITY MODEL | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:- private keys never leave client machines | |
| ./README.txt.old:- CA signs only validated CSRs | |
| ./README.txt.old:- LDAP defines issuance policy | |
| ./README.txt.old:- certificates are the only identity system after enrollment | |
| ./README.txt.old:- all control-plane messages are signed | |
| ./README.txt.old:- replay protection enforced via nonce + timestamp validation | |
| ./README.txt.old:- no API keys exist anywhere in system | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:12. LIMITATIONS | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:- UDP delivery is best-effort (no guaranteed delivery) | |
| ./README.txt.old:- DTLS not fully implemented (UDP-only transport) | |
| ./README.txt.old:- no HA registry or clustering layer yet | |
| ./README.txt.old:- no OCSP/CRL revocation system yet | |
| ./README.txt.old:- ACME implementation is simplified relative to RFC 8555 | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:13. SYSTEM SUMMARY | |
| ./README.txt.old:------------------------------------------------------------ | |
| ./README.txt.old:This system functions as: | |
| ./README.txt.old:- a private internal PKI (Certificate Authority) | |
| ./README.txt.old:- an LDAP-driven policy enforcement engine | |
| ./README.txt.old:- a Kerberos bootstrap enrollment system | |
| ./README.txt.old:- a Celery-based asynchronous issuance pipeline | |
| ./README.txt.old:- a certificate lifecycle management system | |
| ./README.txt.old:- a certificate-only identity system for machines | |
| ./README.txt.old:- a secure UDP control-plane orchestration bus | |
| [sq@msi ~/ca]$ ^C | |
| [sq@msi ~/ca]$ grep -r "." | grep -v README | |
| ./pyproject.toml:[build-system] | |
| ./pyproject.toml:requires = ["setuptools>=68", "wheel"] | |
| ./pyproject.toml:build-backend = "setuptools.build_meta" | |
| ./pyproject.toml:[project] | |
| ./pyproject.toml:name = "django-ca" | |
| ./pyproject.toml:version = "0.1.0" | |
| ./pyproject.toml:description = "Django LDAP Certificate Authority with certificate-based control-plane system" | |
| ./pyproject.toml:requires-python = ">=3.11" | |
| ./pyproject.toml:dependencies = [ | |
| ./pyproject.toml: "Django>=4.2", | |
| ./pyproject.toml: "celery>=5.3", | |
| ./pyproject.toml: "pyzmq>=25.0", | |
| ./pyproject.toml: "python-ldap>=3.4", | |
| ./pyproject.toml: "django-ldapdb>=2.0", | |
| ./pyproject.toml: "cryptography>=42.0", | |
| ./pyproject.toml: "pytz>=2024.1", | |
| ./pyproject.toml:] | |
| ./pyproject.toml:[project.optional-dependencies] | |
| ./pyproject.toml:dev = [ | |
| ./pyproject.toml: "ipython", | |
| ./pyproject.toml: "black", | |
| ./pyproject.toml: "ruff", | |
| ./pyproject.toml:] | |
| ./pyproject.toml:[project.scripts] | |
| ./pyproject.toml:ca-server = "django_ca.cli:main" | |
| ./pyproject.toml:ca-enroll = "client.enroll:main" | |
| ./pyproject.toml:ca-renew = "ca.renew:main" | |
| ./pyproject.toml:ca-agent = "client.agent:main" | |
| ./pyproject.toml:[tool.setuptools] | |
| ./pyproject.toml:packages = [ | |
| ./pyproject.toml: "django_ca", | |
| ./pyproject.toml: "ca", | |
| ./pyproject.toml: "ca.enrollment", | |
| ./pyproject.toml: "client", | |
| ./pyproject.toml:] | |
| ./pyproject.toml:[tool.setuptools.package-data] | |
| ./pyproject.toml:"django_ca" = ["*.py"] | |
| ./pyproject.toml:"ca" = ["*.py"] | |
| ./pyproject.toml:"ca.enrollment" = ["*.py"] | |
| ./pyproject.toml:"client" = ["*.py"] | |
| ./manage.py:# manage.py | |
| ./manage.py:#!/usr/bin/env python3 | |
| ./manage.py:import os | |
| ./manage.py:import sys | |
| ./manage.py:def main(): | |
| ./manage.py: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings") | |
| ./manage.py: from django.core.management import execute_from_command_line | |
| ./manage.py: execute_from_command_line(sys.argv) | |
| ./manage.py:if __name__ == "__main__": | |
| ./manage.py: main() | |
| ./django_ca/__init__.py:# django_ca/__init__.py | |
| ./django_ca/__init__.py:from .celery import app as celery_app | |
| ./django_ca/__init__.py:__all__ = ("celery_app",) | |
| ./django_ca/celery.py:# django_ca/celery.py | |
| ./django_ca/celery.py:""" | |
| ./django_ca/celery.py:Celery application configuration for CA system. | |
| ./django_ca/celery.py:This worker is responsible for: | |
| ./django_ca/celery.py:- asynchronous certificate issuance | |
| ./django_ca/celery.py:- background renewal workflows (future) | |
| ./django_ca/celery.py:- policy evaluation tasks (future) | |
| ./django_ca/celery.py:Transport: | |
| ./django_ca/celery.py:- ZeroMQ broker by default (as requested) | |
| ./django_ca/celery.py:""" | |
| ./django_ca/celery.py:import os | |
| ./django_ca/celery.py:from celery import Celery | |
| ./django_ca/celery.py:os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings") | |
| ./django_ca/celery.py:app = Celery("django_ca") | |
| ./django_ca/celery.py:# Default configuration | |
| ./django_ca/celery.py:app.conf.update( | |
| ./django_ca/celery.py: broker_url=os.environ.get("CELERY_BROKER_URL", "pyamqp://guest@localhost//"), | |
| ./django_ca/celery.py: result_backend=os.environ.get("CELERY_RESULT_BACKEND", "rpc://"), | |
| ./django_ca/celery.py: task_serializer="json", | |
| ./django_ca/celery.py: accept_content=["json"], | |
| ./django_ca/celery.py: result_serializer="json", | |
| ./django_ca/celery.py: timezone="UTC", | |
| ./django_ca/celery.py: enable_utc=True, | |
| ./django_ca/celery.py:) | |
| ./django_ca/celery.py:# Optional ZeroMQ override (explicit opt-in) | |
| ./django_ca/celery.py:if os.environ.get("CELERY_BROKER", "").lower() == "zeromq": | |
| ./django_ca/celery.py: app.conf.update( | |
| ./django_ca/celery.py: broker_url="zmq://", | |
| ./django_ca/celery.py: ) | |
| ./django_ca/celery.py:app.autodiscover_tasks() | |
| ./django_ca/settings.py:from pathlib import Path | |
| ./django_ca/settings.py:import os | |
| ./django_ca/settings.py:BASE_DIR = Path(__file__).resolve().parent.parent | |
| ./django_ca/settings.py:SECRET_KEY = "CHANGE_ME" | |
| ./django_ca/settings.py:DEBUG = True | |
| ./django_ca/settings.py:ALLOWED_HOSTS = ["*"] | |
| ./django_ca/settings.py:INSTALLED_APPS = [ | |
| ./django_ca/settings.py: "django.contrib.admin", | |
| ./django_ca/settings.py: "django.contrib.auth", | |
| ./django_ca/settings.py: "django.contrib.contenttypes", | |
| ./django_ca/settings.py: "django.contrib.sessions", | |
| ./django_ca/settings.py: "django.contrib.messages", | |
| ./django_ca/settings.py: "django.contrib.staticfiles", | |
| ./django_ca/settings.py: "rest_framework", | |
| ./django_ca/settings.py: "django_ldapdb", | |
| ./django_ca/settings.py: "ca", | |
| ./django_ca/settings.py:] | |
| ./django_ca/settings.py:MIDDLEWARE = [ | |
| ./django_ca/settings.py: "django.middleware.security.SecurityMiddleware", | |
| ./django_ca/settings.py: "whitenoise.middleware.WhiteNoiseMiddleware", | |
| ./django_ca/settings.py: "django.contrib.sessions.middleware.SessionMiddleware", | |
| ./django_ca/settings.py: "django.middleware.common.CommonMiddleware", | |
| ./django_ca/settings.py: "django.middleware.csrf.CsrfViewMiddleware", | |
| ./django_ca/settings.py: "django.contrib.auth.middleware.AuthenticationMiddleware", | |
| ./django_ca/settings.py: "django.contrib.messages.middleware.MessageMiddleware", | |
| ./django_ca/settings.py:] | |
| ./django_ca/settings.py:ROOT_URLCONF = "django_ca.urls" | |
| ./django_ca/settings.py:TEMPLATES = [ | |
| ./django_ca/settings.py: { | |
| ./django_ca/settings.py: "BACKEND": "django.template.backends.django.DjangoTemplates", | |
| ./django_ca/settings.py: "DIRS": [], | |
| ./django_ca/settings.py: "APP_DIRS": True, | |
| ./django_ca/settings.py: "OPTIONS": { | |
| ./django_ca/settings.py: "context_processors": [ | |
| ./django_ca/settings.py: "django.template.context_processors.request", | |
| ./django_ca/settings.py: "django.contrib.auth.context_processors.auth", | |
| ./django_ca/settings.py: "django.contrib.messages.context_processors.messages", | |
| ./django_ca/settings.py: ], | |
| ./django_ca/settings.py: }, | |
| ./django_ca/settings.py: }, | |
| ./django_ca/settings.py:] | |
| ./django_ca/settings.py:WSGI_APPLICATION = "django_ca.wsgi.application" | |
| ./django_ca/settings.py:ASGI_APPLICATION = "django_ca.asgi.application" | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:# DATABASE | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:DATABASES = { | |
| ./django_ca/settings.py: "default": { | |
| ./django_ca/settings.py: "ENGINE": "django.db.backends.sqlite3", | |
| ./django_ca/settings.py: "NAME": BASE_DIR / "db.sqlite3", | |
| ./django_ca/settings.py: } | |
| ./django_ca/settings.py:} | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:# INTERNATIONALIZATION | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:LANGUAGE_CODE = "en-us" | |
| ./django_ca/settings.py:TIME_ZONE = "UTC" | |
| ./django_ca/settings.py:USE_I18N = True | |
| ./django_ca/settings.py:USE_TZ = True | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:# STATIC FILES | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:STATIC_URL = "/static/" | |
| ./django_ca/settings.py:STATIC_ROOT = BASE_DIR / "static" | |
| ./django_ca/settings.py:DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" | |
| ./django_ca/settings.py:# ========================================================= | |
| ./django_ca/settings.py:# LDAP CONFIGURATION (NOW SINGLE SOURCE OF TRUTH) | |
| ./django_ca/settings.py:# ========================================================= | |
| ./django_ca/settings.py:LDAP_SERVER_URI = os.environ.get("LDAP_URI", "ldap://ldap.example.com") | |
| ./django_ca/settings.py:LDAP_BIND_DN = os.environ.get("LDAP_BIND_DN", "cn=service,dc=example,dc=com") | |
| ./django_ca/settings.py:LDAP_BIND_PASSWORD = os.environ.get("LDAP_BIND_PASSWORD", "CHANGE_ME") | |
| ./django_ca/settings.py:LDAP_BASE_DN = os.environ.get("LDAP_BASE_DN", "dc=example,dc=com") | |
| ./django_ca/settings.py:# LDAP STRUCTURE (previously ldapdb_config.py) | |
| ./django_ca/settings.py:LDAP_HOSTS_OU = os.environ.get("LDAP_HOSTS_OU", "ou=hosts") | |
| ./django_ca/settings.py:LDAP_PROFILES_OU = os.environ.get("LDAP_PROFILES_OU", "ou=cert-profiles") | |
| ./django_ca/settings.py:LDAP_CERTS_OU = os.environ.get("LDAP_CERTS_OU", "ou=issued-certs") | |
| ./django_ca/settings.py:LDAP_ENROLLMENT_OU = os.environ.get("LDAP_ENROLLMENT_OU", "ou=enrollment-events") | |
| ./django_ca/settings.py:LDAP_USE_TLS = os.environ.get("LDAP_USE_TLS", "false").lower() == "true" | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:# CA CONFIG | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:CA_CERTIFICATE_PATH = "/etc/myca/ca.crt" | |
| ./django_ca/settings.py:CA_PRIVATE_KEY_PATH = "/etc/myca/intermediate.key" | |
| ./django_ca/settings.py:CA_PRIVATE_KEY_PASSWORD = None | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:# CELERY | |
| ./django_ca/settings.py:# ----------------------------- | |
| ./django_ca/settings.py:CELERY_BROKER_URL = os.environ.get( | |
| ./django_ca/settings.py: "CELERY_BROKER_URL", | |
| ./django_ca/settings.py: "redis://127.0.0.1:6379/0", | |
| ./django_ca/settings.py:) | |
| ./django_ca/settings.py:CELERY_RESULT_BACKEND = os.environ.get( | |
| ./django_ca/settings.py: "CELERY_RESULT_BACKEND", | |
| ./django_ca/settings.py: "redis://127.0.0.1:6379/1", | |
| ./django_ca/settings.py:) | |
| ./django_ca/settings.py:CELERY_ACCEPT_CONTENT = ["json"] | |
| ./django_ca/settings.py:CELERY_TASK_SERIALIZER = "json" | |
| ./django_ca/settings.py:CELERY_RESULT_SERIALIZER = "json" | |
| ./django_ca/settings.py:CELERY_TIMEZONE = "UTC" | |
| ./django_ca/urls.py:# django_ca/urls.py | |
| ./django_ca/urls.py:""" | |
| ./django_ca/urls.py:Root URL routing for CA system. | |
| ./django_ca/urls.py:This file defines the public control-plane API surface: | |
| ./django_ca/urls.py:- Enrollment (Kerberos-gated bootstrap) | |
| ./django_ca/urls.py:- Certificate issuance | |
| ./django_ca/urls.py:- ACME-like compatibility endpoints | |
| ./django_ca/urls.py:No business logic should exist here. | |
| ./django_ca/urls.py:""" | |
| ./django_ca/urls.py:from django.contrib import admin | |
| ./django_ca/urls.py:from django.urls import path | |
| ./django_ca/urls.py:from ca.enrollment.views import enroll, issue | |
| ./django_ca/urls.py:from ca.acme.views import new_order, finalize | |
| ./django_ca/urls.py:urlpatterns = [ | |
| ./django_ca/urls.py: path("admin/", admin.site.urls), | |
| ./django_ca/urls.py: # ----------------------------- | |
| ./django_ca/urls.py: # Enrollment API (Kerberos gate + CSR lifecycle) | |
| ./django_ca/urls.py: # ----------------------------- | |
| ./django_ca/urls.py: path("enroll/", enroll), | |
| ./django_ca/urls.py: path("issue/", issue), | |
| ./django_ca/urls.py: # ----------------------------- | |
| ./django_ca/urls.py: # ACME-compatible surface | |
| ./django_ca/urls.py: # ----------------------------- | |
| ./django_ca/urls.py: path("acme/new-order/", new_order), | |
| ./django_ca/urls.py: path("acme/finalize/", finalize), | |
| ./django_ca/urls.py:] | |
| ./django_ca/wsgi.py:# django_ca/wsgi.py | |
| ./django_ca/wsgi.py:import os | |
| ./django_ca/wsgi.py:from django.core.wsgi import get_wsgi_application | |
| ./django_ca/wsgi.py:os.environ.setdefault( | |
| ./django_ca/wsgi.py: "DJANGO_SETTINGS_MODULE", | |
| ./django_ca/wsgi.py: "django_ca.settings", | |
| ./django_ca/wsgi.py:) | |
| ./django_ca/wsgi.py:application = get_wsgi_application() | |
| ./django_ca/asgi.py:# django_ca/asgi.py | |
| ./django_ca/asgi.py:import os | |
| ./django_ca/asgi.py:from django.core.asgi import get_asgi_application | |
| ./django_ca/asgi.py:os.environ.setdefault( | |
| ./django_ca/asgi.py: "DJANGO_SETTINGS_MODULE", | |
| ./django_ca/asgi.py: "django_ca.settings", | |
| ./django_ca/asgi.py:) | |
| ./django_ca/asgi.py:application = get_asgi_application() | |
| ./django_ca/cli.py:# django_ca/cli.py | |
| ./django_ca/cli.py:import os | |
| ./django_ca/cli.py:import sys | |
| ./django_ca/cli.py:import argparse | |
| ./django_ca/cli.py:import django | |
| ./django_ca/cli.py:from pathlib import Path | |
| ./django_ca/cli.py:def setup_django(): | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: Bootstraps Django environment for CLI execution. | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ca.settings") | |
| ./django_ca/cli.py: django.setup() | |
| ./django_ca/cli.py:def run_server(host="0.0.0.0", port=8000): | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: Runs Django development server (CA API + enrollment + ACME endpoints). | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: from django.core.management import execute_from_command_line | |
| ./django_ca/cli.py: sys.argv = ["manage.py", "runserver", f"{host}:{port}"] | |
| ./django_ca/cli.py: execute_from_command_line(sys.argv) | |
| ./django_ca/cli.py:def run_migrations(): | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: Applies database migrations for CA models. | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: from django.core.management import execute_from_command_line | |
| ./django_ca/cli.py: sys.argv = ["manage.py", "migrate"] | |
| ./django_ca/cli.py: execute_from_command_line(sys.argv) | |
| ./django_ca/cli.py:def create_superuser(): | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: Creates Django admin superuser (interactive). | |
| ./django_ca/cli.py: """ | |
| ./django_ca/cli.py: from django.core.management import execute_from_command_line | |
| ./django_ca/cli.py: sys.argv = ["manage.py", "createsuperuser"] | |
| ./django_ca/cli.py: execute_from_command_line(sys.argv) | |
| ./django_ca/cli.py:def main(): | |
| ./django_ca/cli.py: setup_django() | |
| ./django_ca/cli.py: parser = argparse.ArgumentParser(description="Django CA Control Plane CLI") | |
| ./django_ca/cli.py: sub = parser.add_subparsers(dest="command") | |
| ./django_ca/cli.py: server = sub.add_parser("server", help="Run CA server") | |
| ./django_ca/cli.py: server.add_argument("--host", default="0.0.0.0") | |
| ./django_ca/cli.py: server.add_argument("--port", default=8000, type=int) | |
| ./django_ca/cli.py: sub.add_parser("migrate", help="Run migrations") | |
| ./django_ca/cli.py: sub.add_parser("createsuperuser", help="Create admin user") | |
| ./django_ca/cli.py: args = parser.parse_args() | |
| ./django_ca/cli.py: if args.command == "server": | |
| ./django_ca/cli.py: run_server(args.host, args.port) | |
| ./django_ca/cli.py: elif args.command == "migrate": | |
| ./django_ca/cli.py: run_migrations() | |
| ./django_ca/cli.py: elif args.command == "createsuperuser": | |
| ./django_ca/cli.py: create_superuser() | |
| ./django_ca/cli.py: else: | |
| ./django_ca/cli.py: parser.print_help() | |
| ./django_ca/cli.py:if __name__ == "__main__": | |
| ./django_ca/cli.py: main() | |
| ./ca/apps.py:# ca/apps.py | |
| ./ca/apps.py:from django.apps import AppConfig | |
| ./ca/apps.py:class CAConfig(AppConfig): | |
| ./ca/apps.py: default_auto_field = "django.db.models.BigAutoField" | |
| ./ca/apps.py: name = "ca" | |
| ./ca/models.py:# ca/models.py | |
| ./ca/models.py:from django_ldapdb.models import Model | |
| ./ca/models.py:from django_ldapdb.models.fields import CharField, IntegerField, BooleanField, TextField | |
| ./ca/models.py:class Host(Model): | |
| ./ca/models.py: """ | |
| ./ca/models.py: Logical host identity. | |
| ./ca/models.py: LDAP is the backing store, but structure is NOT defined here. | |
| ./ca/models.py: This model represents identity semantics only. | |
| ./ca/models.py: """ | |
| ./ca/models.py: cn = CharField(max_length=255, primary_key=True) | |
| ./ca/models.py: ip_address = CharField(max_length=255, null=True, blank=True) | |
| ./ca/models.py: is_enrolled = BooleanField(default=False) | |
| ./ca/models.py: def __str__(self): | |
| ./ca/models.py: return self.cn | |
| ./ca/models.py:class CertificateProfile(Model): | |
| ./ca/models.py: """ | |
| ./ca/models.py: Certificate issuance policy object. | |
| ./ca/models.py: Defines rules for: | |
| ./ca/models.py: - SAN/CN constraints | |
| ./ca/models.py: - validity | |
| ./ca/models.py: - control-plane requirements | |
| ./ca/models.py: """ | |
| ./ca/models.py: name = CharField(max_length=255, primary_key=True) | |
| ./ca/models.py: ldap_filter = TextField() | |
| ./ca/models.py: allowed_san_regex = TextField(null=True, blank=True) | |
| ./ca/models.py: allowed_cn_regex = TextField(null=True, blank=True) | |
| ./ca/models.py: requires_control_plane_cert = BooleanField(default=True) | |
| ./ca/models.py: validity_days = IntegerField(default=365) | |
| ./ca/models.py: def __str__(self): | |
| ./ca/models.py: return self.name | |
| ./ca/models.py:class IssuedCertificate(Model): | |
| ./ca/models.py: """ | |
| ./ca/models.py: Certificate lifecycle record. | |
| ./ca/models.py: This is the CA’s authoritative issuance ledger. | |
| ./ca/models.py: """ | |
| ./ca/models.py: serial_number = CharField(max_length=255, primary_key=True) | |
| ./ca/models.py: hostname = CharField(max_length=255) | |
| ./ca/models.py: profile_name = CharField(max_length=255) | |
| ./ca/models.py: certificate_pem = TextField() | |
| ./ca/models.py: ca_bundle_pem = TextField() | |
| ./ca/models.py: issued_at = CharField(max_length=64) | |
| ./ca/models.py: not_after = CharField(max_length=64, null=True, blank=True) | |
| ./ca/models.py: last_csr = TextField(null=True, blank=True) | |
| ./ca/models.py: revoked = BooleanField(default=False) | |
| ./ca/models.py: def __str__(self): | |
| ./ca/models.py: return f"{self.hostname}:{self.profile_name}" | |
| ./ca/admin.py:# ca/admin.py | |
| ./ca/admin.py:from django.contrib import admin | |
| ./ca/admin.py:from .models import ( | |
| ./ca/admin.py: CertificateProfile, | |
| ./ca/admin.py: EnrolledHost, | |
| ./ca/admin.py: IssuedCertificate, | |
| ./ca/admin.py:) | |
| ./ca/admin.py:admin.site.register(CertificateProfile) | |
| ./ca/admin.py:admin.site.register(EnrolledHost) | |
| ./ca/admin.py:admin.site.register(IssuedCertificate) | |
| ./ca/auth.py:# ca/auth.py | |
| ./ca/auth.py:""" | |
| ./ca/auth.py:Certificate-based authentication backend. | |
| ./ca/auth.py:Runtime identity is derived from client certificates | |
| ./ca/auth.py:provided by TLS terminator / reverse proxy. | |
| ./ca/auth.py:""" | |
| ./ca/auth.py:from django.contrib.auth.backends import BaseBackend | |
| ./ca/auth.py:from django.contrib.auth.models import User | |
| ./ca/auth.py:class CertificateAuthBackend(BaseBackend): | |
| ./ca/auth.py: """ | |
| ./ca/auth.py: Authenticates requests using client certificate headers. | |
| ./ca/auth.py: Expected headers (from proxy): | |
| ./ca/auth.py: X-Client-CN | |
| ./ca/auth.py: X-Client-Serial | |
| ./ca/auth.py: """ | |
| ./ca/auth.py: def authenticate(self, request, username=None, password=None, **kwargs): | |
| ./ca/auth.py: if not request: | |
| ./ca/auth.py: return None | |
| ./ca/auth.py: cn = request.META.get("HTTP_X_CLIENT_CN") | |
| ./ca/auth.py: serial = request.META.get("HTTP_X_CLIENT_SERIAL") | |
| ./ca/auth.py: if not cn or not serial: | |
| ./ca/auth.py: return None | |
| ./ca/auth.py: # Identity is certificate-based. | |
| ./ca/auth.py: # Django user is OPTIONAL mapping layer, not authority. | |
| ./ca/auth.py: try: | |
| ./ca/auth.py: return User.objects.filter(username=cn).first() | |
| ./ca/auth.py: except Exception: | |
| ./ca/auth.py: return None | |
| ./ca/auth.py: def get_user(self, user_id): | |
| ./ca/auth.py: try: | |
| ./ca/auth.py: return User.objects.get(pk=user_id) | |
| ./ca/auth.py: except User.DoesNotExist: | |
| ./ca/auth.py: return None | |
| ./ca/tasks.py:# ca/tasks.py | |
| ./ca/tasks.py:""" | |
| ./ca/tasks.py:Celery task layer for asynchronous certificate issuance. | |
| ./ca/tasks.py:This is the boundary between: | |
| ./ca/tasks.py:- HTTP/API requests | |
| ./ca/tasks.py:- CA issuance engine | |
| ./ca/tasks.py:- background worker execution | |
| ./ca/tasks.py:Execution model: | |
| ./ca/tasks.py:- API triggers task | |
| ./ca/tasks.py:- Celery worker executes issuance | |
| ./ca/tasks.py:- result is returned or stored in LDAP-backed IssuedCertificate | |
| ./ca/tasks.py:""" | |
| ./ca/tasks.py:from celery import shared_task | |
| ./ca/tasks.py:from ca.issuance import issue_certificate | |
| ./ca/tasks.py:@shared_task | |
| ./ca/tasks.py:def issue_certificate_task(hostname: str, csr_pem: str, profile_name: str): | |
| ./ca/tasks.py: """ | |
| ./ca/tasks.py: Background certificate issuance task. | |
| ./ca/tasks.py: """ | |
| ./ca/tasks.py: return issue_certificate( | |
| ./ca/tasks.py: hostname=hostname, | |
| ./ca/tasks.py: csr_pem=csr_pem, | |
| ./ca/tasks.py: profile_name=profile_name, | |
| ./ca/tasks.py: ) | |
| ./ca/issuance.py:# ca/issuance.py | |
| ./ca/issuance.py:""" | |
| ./ca/issuance.py:Issuance layer (service boundary). | |
| ./ca/issuance.py:This module sits between: | |
| ./ca/issuance.py:- API / enrollment layer | |
| ./ca/issuance.py:- CA engine (cryptographic core) | |
| ./ca/issuance.py:- LDAP-backed persistence models | |
| ./ca/issuance.py:It enforces policy flow and keeps CAEngine isolated from request logic. | |
| ./ca/issuance.py:""" | |
| ./ca/issuance.py:from typing import Dict, Any | |
| ./ca/issuance.py:from ca.ca_engine import CAEngine | |
| ./ca/issuance.py:from ca.models import CertificateProfile | |
| ./ca/issuance.py:class IssuanceService: | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: High-level certificate issuance orchestration layer. | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: def __init__(self, engine: CAEngine): | |
| ./ca/issuance.py: self.engine = engine | |
| ./ca/issuance.py: def get_profile(self, profile_name: str) -> CertificateProfile: | |
| ./ca/issuance.py: return CertificateProfile.objects.get(name=profile_name) | |
| ./ca/issuance.py: def issue( | |
| ./ca/issuance.py: self, | |
| ./ca/issuance.py: hostname: str, | |
| ./ca/issuance.py: csr_pem: str, | |
| ./ca/issuance.py: profile_name: str, | |
| ./ca/issuance.py: ) -> Dict[str, Any]: | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: Main entrypoint for certificate issuance. | |
| ./ca/issuance.py: Flow: | |
| ./ca/issuance.py: 1. Validate profile exists | |
| ./ca/issuance.py: 2. Delegate CSR validation + signing to CA engine | |
| ./ca/issuance.py: 3. Return signed certificate bundle | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: profile = self.get_profile(profile_name) | |
| ./ca/issuance.py: # Minimal sanity gate before engine | |
| ./ca/issuance.py: if not profile: | |
| ./ca/issuance.py: raise ValueError(f"Unknown profile: {profile_name}") | |
| ./ca/issuance.py: return self.engine.issue_certificate( | |
| ./ca/issuance.py: hostname=hostname, | |
| ./ca/issuance.py: csr_pem=csr_pem, | |
| ./ca/issuance.py: profile_name=profile.name, | |
| ./ca/issuance.py: ) | |
| ./ca/issuance.py:def issue_certificate(hostname: str, csr_pem: str, profile_name: str) -> Dict[str, Any]: | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: Functional wrapper used by Celery/tasks and API endpoints. | |
| ./ca/issuance.py: """ | |
| ./ca/issuance.py: # NOTE: In real deployment, keys would be loaded from secure storage (HSM/filesystem vault) | |
| ./ca/issuance.py: with open("/etc/ca/ca.key", "rb") as f: | |
| ./ca/issuance.py: ca_key = f.read() | |
| ./ca/issuance.py: with open("/etc/ca/ca.crt", "rb") as f: | |
| ./ca/issuance.py: ca_cert = f.read() | |
| ./ca/issuance.py: engine = CAEngine( | |
| ./ca/issuance.py: ca_private_key_pem=ca_key, | |
| ./ca/issuance.py: ca_cert_pem=ca_cert, | |
| ./ca/issuance.py: ) | |
| ./ca/issuance.py: service = IssuanceService(engine) | |
| ./ca/issuance.py: return service.issue( | |
| ./ca/issuance.py: hostname=hostname, | |
| ./ca/issuance.py: csr_pem=csr_pem, | |
| ./ca/issuance.py: profile_name=profile_name, | |
| ./ca/issuance.py: ) | |
| ./ca/enrollment/kerberos.py:# ca/enrollment/kerberos.py | |
| ./ca/enrollment/kerberos.py:""" | |
| ./ca/enrollment/kerberos.py:Kerberos enrollment gate. | |
| ./ca/enrollment/kerberos.py:This is the ONLY place Kerberos is used. | |
| ./ca/enrollment/kerberos.py:Responsibilities: | |
| ./ca/enrollment/kerberos.py:- validate Kerberos identity (external system / ticket already obtained) | |
| ./ca/enrollment/kerberos.py:- authorize enrollment request | |
| ./ca/enrollment/kerberos.py:- produce a trusted enrollment context for CA pipeline | |
| ./ca/enrollment/kerberos.py:NOT used for: | |
| ./ca/enrollment/kerberos.py:- runtime authentication | |
| ./ca/enrollment/kerberos.py:- certificate validation | |
| ./ca/enrollment/kerberos.py:- LDAP policy decisions | |
| ./ca/enrollment/kerberos.py:""" | |
| ./ca/enrollment/kerberos.py:from dataclasses import dataclass | |
| ./ca/enrollment/kerberos.py:@dataclass | |
| ./ca/enrollment/kerberos.py:class KerberosContext: | |
| ./ca/enrollment/kerberos.py: principal: str | |
| ./ca/enrollment/kerberos.py: realm: str | |
| ./ca/enrollment/kerberos.py: authorized: bool | |
| ./ca/enrollment/kerberos.py:class KerberosGate: | |
| ./ca/enrollment/kerberos.py: """ | |
| ./ca/enrollment/kerberos.py: Manual + external Kerberos verification boundary. | |
| ./ca/enrollment/kerberos.py: Assumption: | |
| ./ca/enrollment/kerberos.py: - Kerberos authentication already happened outside this system | |
| ./ca/enrollment/kerberos.py: - We only receive validated identity assertions | |
| ./ca/enrollment/kerberos.py: """ | |
| ./ca/enrollment/kerberos.py: def validate(self, principal: str) -> KerberosContext: | |
| ./ca/enrollment/kerberos.py: """ | |
| ./ca/enrollment/kerberos.py: Validate a Kerberos principal (placeholder for integration). | |
| ./ca/enrollment/kerberos.py: In real deployment this could be: | |
| ./ca/enrollment/kerberos.py: - kinit-provided assertion | |
| ./ca/enrollment/kerberos.py: - reverse proxy header (SPNEGO) | |
| ./ca/enrollment/kerberos.py: - signed enrollment token from Kerberos-aware gateway | |
| ./ca/enrollment/kerberos.py: """ | |
| ./ca/enrollment/kerberos.py: if not principal or "@" not in principal: | |
| ./ca/enrollment/kerberos.py: raise ValueError("Invalid Kerberos principal format") | |
| ./ca/enrollment/kerberos.py: realm = principal.split("@")[1] | |
| ./ca/enrollment/kerberos.py: # NOTE: replace with real policy check later | |
| ./ca/enrollment/kerberos.py: authorized = True | |
| ./ca/enrollment/kerberos.py: return KerberosContext( | |
| ./ca/enrollment/kerberos.py: principal=principal, | |
| ./ca/enrollment/kerberos.py: realm=realm, | |
| ./ca/enrollment/kerberos.py: authorized=authorized, | |
| ./ca/enrollment/kerberos.py: ) | |
| ./ca/enrollment/views.py:# ca/enrollment/views.py | |
| ./ca/enrollment/views.py:""" | |
| ./ca/enrollment/views.py:Enrollment API layer with explicit Kerberos gate. | |
| ./ca/enrollment/views.py:This is the ONLY place where Kerberos is enforced in request flow. | |
| ./ca/enrollment/views.py:Flow: | |
| ./ca/enrollment/views.py:1. Client provides hostname + CSR + Kerberos principal | |
| ./ca/enrollment/views.py:2. KerberosGate validates identity (external trust boundary) | |
| ./ca/enrollment/views.py:3. Host is checked/created in registry | |
| ./ca/enrollment/views.py:4. Certificate issuance is delegated to CA engine via issuance layer | |
| ./ca/enrollment/views.py:""" | |
| ./ca/enrollment/views.py:import json | |
| ./ca/enrollment/views.py:from django.http import JsonResponse, HttpResponseBadRequest | |
| ./ca/enrollment/views.py:from django.views.decorators.csrf import csrf_exempt | |
| ./ca/enrollment/views.py:from ca.issuance import issue_certificate | |
| ./ca/enrollment/views.py:from ca.registry import HostRegistry | |
| ./ca/enrollment/views.py:from ca.enrollment.kerberos import KerberosGate | |
| ./ca/enrollment/views.py:registry = HostRegistry() | |
| ./ca/enrollment/views.py:kerberos_gate = KerberosGate() | |
| ./ca/enrollment/views.py:def _parse(request): | |
| ./ca/enrollment/views.py: try: | |
| ./ca/enrollment/views.py: return json.loads(request.body.decode("utf-8")) | |
| ./ca/enrollment/views.py: except Exception: | |
| ./ca/enrollment/views.py: return None | |
| ./ca/enrollment/views.py:@csrf_exempt | |
| ./ca/enrollment/views.py:def enroll(request): | |
| ./ca/enrollment/views.py: """ | |
| ./ca/enrollment/views.py: Step 1: Kerberos-authenticated enrollment initiation. | |
| ./ca/enrollment/views.py: Requires: | |
| ./ca/enrollment/views.py: - hostname | |
| ./ca/enrollment/views.py: - kerberos_principal | |
| ./ca/enrollment/views.py: """ | |
| ./ca/enrollment/views.py: if request.method != "POST": | |
| ./ca/enrollment/views.py: return HttpResponseBadRequest("POST required") | |
| ./ca/enrollment/views.py: data = _parse(request) | |
| ./ca/enrollment/views.py: if not data: | |
| ./ca/enrollment/views.py: return HttpResponseBadRequest("invalid json") | |
| ./ca/enrollment/views.py: required = ["hostname", "kerberos_principal"] | |
| ./ca/enrollment/views.py: if any(k not in data for k in required): | |
| ./ca/enrollment/views.py: return HttpResponseBadRequest("missing fields") | |
| ./ca/enrollment/views.py: hostname = data["hostname"] | |
| ./ca/enrollment/views.py: principal = data["kerberos_principal"] | |
| ./ca/enrollment/views.py: # Kerberos is enforced ONLY here | |
| ./ca/enrollment/views.py: ctx = kerberos_gate.validate(principal) | |
| ./ca/enrollment/views.py: if not ctx.authorized: | |
| ./ca/enrollment/views.py: return JsonResponse({"error": "kerberos unauthorized"}, status=403) | |
| ./ca/enrollment/views.py: host = registry.get_host(hostname) | |
| ./ca/enrollment/views.py: if not host: | |
| ./ca/enrollment/views.py: return JsonResponse({"error": "unknown host"}, status=403) | |
| ./ca/enrollment/views.py: return JsonResponse( | |
| ./ca/enrollment/views.py: { | |
| ./ca/enrollment/views.py: "hostname": hostname, | |
| ./ca/enrollment/views.py: "realm": ctx.realm, | |
| ./ca/enrollment/views.py: "profiles": ["control-plane"], | |
| ./ca/enrollment/views.py: } | |
| ./ca/enrollment/views.py: ) | |
| ./ca/enrollment/views.py:@csrf_exempt | |
| ./ca/enrollment/views.py:def issue(request): | |
| ./ca/enrollment/views.py: """ | |
| ./ca/enrollment/views.py: Step 2: Certificate issuance (NO Kerberos here). | |
| ./ca/enrollment/views.py: Requires: | |
| ./ca/enrollment/views.py: - hostname | |
| ./ca/enrollment/views.py: - csr | |
| ./ca/enrollment/views.py: - profile | |
| ./ca/enrollment/views.py: """ | |
| ./ca/enrollment/views.py: if request.method != "POST": | |
| ./ca/enrollment/views.py: return HttpResponseBadRequest("POST required") | |
| ./ca/enrollment/views.py: data = _parse(request) | |
| ./ca/enrollment/views.py: required = ["hostname", "csr", "profile"] | |
| ./ca/enrollment/views.py: if not data or any(k not in data for k in required): | |
| ./ca/enrollment/views.py: return HttpResponseBadRequest("missing fields") | |
| ./ca/enrollment/views.py: result = issue_certificate( | |
| ./ca/enrollment/views.py: hostname=data["hostname"], | |
| ./ca/enrollment/views.py: csr_pem=data["csr"], | |
| ./ca/enrollment/views.py: profile_name=data["profile"], | |
| ./ca/enrollment/views.py: ) | |
| ./ca/enrollment/views.py: return JsonResponse(result) | |
| ./ca/acme/views.py:# ca/acme/views.py | |
| ./ca/acme/views.py:import json | |
| ./ca/acme/views.py:import uuid | |
| ./ca/acme/views.py:import datetime | |
| ./ca/acme/views.py:from django.http import JsonResponse, HttpResponse | |
| ./ca/acme/views.py:from django.views.decorators.csrf import csrf_exempt | |
| ./ca/acme/views.py:from ca.issuance import issue_certificate | |
| ./ca/acme/views.py:from ca.auth import authenticate_api_key | |
| ./ca/acme/views.py:# Very small in-memory ACME state (you can later move to DB or Redis) | |
| ./ca/acme/views.py:_NONCES = set() | |
| ./ca/acme/views.py:_ORDERS = {} | |
| ./ca/acme/views.py:def _require_api_key(request): | |
| ./ca/acme/views.py: api_key = request.headers.get("X-API-Key") | |
| ./ca/acme/views.py: if not api_key: | |
| ./ca/acme/views.py: return None | |
| ./ca/acme/views.py: return authenticate_api_key(api_key) | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def directory(request): | |
| ./ca/acme/views.py: return JsonResponse({ | |
| ./ca/acme/views.py: "newNonce": "/acme/new-nonce", | |
| ./ca/acme/views.py: "newAccount": "/acme/new-account", | |
| ./ca/acme/views.py: "newOrder": "/acme/new-order", | |
| ./ca/acme/views.py: "finalize": "/acme/finalize", | |
| ./ca/acme/views.py: "certificate": "/acme/certificate", | |
| ./ca/acme/views.py: }) | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def new_nonce(request): | |
| ./ca/acme/views.py: nonce = uuid.uuid4().hex | |
| ./ca/acme/views.py: _NONCES.add(nonce) | |
| ./ca/acme/views.py: resp = HttpResponse(status=204) | |
| ./ca/acme/views.py: resp["Replay-Nonce"] = nonce | |
| ./ca/acme/views.py: return resp | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def new_account(request): | |
| ./ca/acme/views.py: host = _require_api_key(request) | |
| ./ca/acme/views.py: if not host: | |
| ./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403) | |
| ./ca/acme/views.py: return JsonResponse({ | |
| ./ca/acme/views.py: "status": "valid", | |
| ./ca/acme/views.py: "account": host.hostname, | |
| ./ca/acme/views.py: }) | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def new_order(request): | |
| ./ca/acme/views.py: host = _require_api_key(request) | |
| ./ca/acme/views.py: if not host: | |
| ./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403) | |
| ./ca/acme/views.py: try: | |
| ./ca/acme/views.py: payload = json.loads(request.body.decode()) | |
| ./ca/acme/views.py: except Exception: | |
| ./ca/acme/views.py: return JsonResponse({"error": "invalid json"}, status=400) | |
| ./ca/acme/views.py: identifiers = payload.get("identifiers", []) | |
| ./ca/acme/views.py: if not identifiers: | |
| ./ca/acme/views.py: return JsonResponse({"error": "no identifiers"}, status=400) | |
| ./ca/acme/views.py: order_id = uuid.uuid4().hex | |
| ./ca/acme/views.py: _ORDERS[order_id] = { | |
| ./ca/acme/views.py: "hostname": host.hostname, | |
| ./ca/acme/views.py: "identifiers": identifiers, | |
| ./ca/acme/views.py: "status": "ready", | |
| ./ca/acme/views.py: "created_at": datetime.datetime.utcnow(), | |
| ./ca/acme/views.py: } | |
| ./ca/acme/views.py: return JsonResponse({ | |
| ./ca/acme/views.py: "order_id": order_id, | |
| ./ca/acme/views.py: "status": "ready", | |
| ./ca/acme/views.py: "identifiers": identifiers, | |
| ./ca/acme/views.py: "finalize": f"/acme/finalize/{order_id}", | |
| ./ca/acme/views.py: }) | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def finalize_order(request, order_id): | |
| ./ca/acme/views.py: host = _require_api_key(request) | |
| ./ca/acme/views.py: if not host: | |
| ./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403) | |
| ./ca/acme/views.py: order = _ORDERS.get(order_id) | |
| ./ca/acme/views.py: if not order: | |
| ./ca/acme/views.py: return JsonResponse({"error": "order not found"}, status=404) | |
| ./ca/acme/views.py: try: | |
| ./ca/acme/views.py: payload = json.loads(request.body.decode()) | |
| ./ca/acme/views.py: except Exception: | |
| ./ca/acme/views.py: return JsonResponse({"error": "invalid json"}, status=400) | |
| ./ca/acme/views.py: csr_pem = payload.get("csr") | |
| ./ca/acme/views.py: if not csr_pem: | |
| ./ca/acme/views.py: return JsonResponse({"error": "missing csr"}, status=400) | |
| ./ca/acme/views.py: identifiers = order["identifiers"] | |
| ./ca/acme/views.py: san_dns = [ | |
| ./ca/acme/views.py: i["value"] for i in identifiers if i.get("type") == "dns" | |
| ./ca/acme/views.py: ] | |
| ./ca/acme/views.py: cert_result = issue_certificate( | |
| ./ca/acme/views.py: hostname=host.hostname, | |
| ./ca/acme/views.py: csr_pem=csr_pem, | |
| ./ca/acme/views.py: profile_name="acme", | |
| ./ca/acme/views.py: san_dns=san_dns, | |
| ./ca/acme/views.py: ) | |
| ./ca/acme/views.py: order["status"] = "valid" | |
| ./ca/acme/views.py: order["certificate_serial"] = cert_result["serial_number"] | |
| ./ca/acme/views.py: return JsonResponse({ | |
| ./ca/acme/views.py: "status": "valid", | |
| ./ca/acme/views.py: "certificate": cert_result["certificate"], | |
| ./ca/acme/views.py: "serial_number": cert_result["serial_number"], | |
| ./ca/acme/views.py: }) | |
| ./ca/acme/views.py:@csrf_exempt | |
| ./ca/acme/views.py:def certificate(request, serial_number): | |
| ./ca/acme/views.py: host = _require_api_key(request) | |
| ./ca/acme/views.py: if not host: | |
| ./ca/acme/views.py: return JsonResponse({"error": "unauthorized"}, status=403) | |
| ./ca/acme/views.py: return JsonResponse({ | |
| ./ca/acme/views.py: "error": "use issuance record lookup (not implemented here yet)", | |
| ./ca/acme/views.py: "serial_number": serial_number, | |
| ./ca/acme/views.py: }) | |
| ./ca/ca_engine.py:# ca/ca_engine.py | |
| ./ca/ca_engine.py:import datetime | |
| ./ca/ca_engine.py:import uuid | |
| ./ca/ca_engine.py:from dataclasses import dataclass | |
| ./ca/ca_engine.py:from typing import Dict, Any, Optional | |
| ./ca/ca_engine.py:from cryptography import x509 | |
| ./ca/ca_engine.py:from cryptography.hazmat.primitives import hashes, serialization | |
| ./ca/ca_engine.py:from cryptography.hazmat.primitives.asymmetric import rsa | |
| ./ca/ca_engine.py:from cryptography.x509.oid import NameOID | |
| ./ca/ca_engine.py:from ca.models import CertificateProfile, IssuedCertificate | |
| ./ca/ca_engine.py:@dataclass | |
| ./ca/ca_engine.py:class IssuedBundle: | |
| ./ca/ca_engine.py: certificate_pem: str | |
| ./ca/ca_engine.py: ca_bundle_pem: str | |
| ./ca/ca_engine.py: serial_number: str | |
| ./ca/ca_engine.py: not_after: datetime.datetime | |
| ./ca/ca_engine.py:class CAEngine: | |
| ./ca/ca_engine.py: """ | |
| ./ca/ca_engine.py: Core Certificate Authority engine. | |
| ./ca/ca_engine.py: Responsibilities: | |
| ./ca/ca_engine.py: - validate CSR against policy | |
| ./ca/ca_engine.py: - enforce profile constraints | |
| ./ca/ca_engine.py: - issue X.509 certificates | |
| ./ca/ca_engine.py: - record issuance in LDAP-backed store | |
| ./ca/ca_engine.py: """ | |
| ./ca/ca_engine.py: def __init__(self, ca_private_key_pem: bytes, ca_cert_pem: bytes): | |
| ./ca/ca_engine.py: self.ca_private_key = serialization.load_pem_private_key( | |
| ./ca/ca_engine.py: ca_private_key_pem, | |
| ./ca/ca_engine.py: password=None, | |
| ./ca/ca_engine.py: ) | |
| ./ca/ca_engine.py: self.ca_cert = x509.load_pem_x509_certificate(ca_cert_pem) | |
| ./ca/ca_engine.py: def _load_profile(self, profile_name: str) -> CertificateProfile: | |
| ./ca/ca_engine.py: return CertificateProfile.objects.get(name=profile_name) | |
| ./ca/ca_engine.py: def _validate_csr(self, csr: x509.CertificateSigningRequest, profile: CertificateProfile): | |
| ./ca/ca_engine.py: # Basic CSR validation | |
| ./ca/ca_engine.py: if not csr.is_signature_valid: | |
| ./ca/ca_engine.py: raise ValueError("Invalid CSR signature") | |
| ./ca/ca_engine.py: cn = None | |
| ./ca/ca_engine.py: for attr in csr.subject: | |
| ./ca/ca_engine.py: if attr.oid == NameOID.COMMON_NAME: | |
| ./ca/ca_engine.py: cn = attr.value | |
| ./ca/ca_engine.py: if profile.allowed_cn_regex and cn: | |
| ./ca/ca_engine.py: import re | |
| ./ca/ca_engine.py: if not re.match(profile.allowed_cn_regex, cn): | |
| ./ca/ca_engine.py: raise ValueError("CN does not match profile policy") | |
| ./ca/ca_engine.py: # SAN validation (if present) | |
| ./ca/ca_engine.py: try: | |
| ./ca/ca_engine.py: san = csr.extensions.get_extension_for_class(x509.SubjectAlternativeName) | |
| ./ca/ca_engine.py: san_values = san.value.get_values_for_type(x509.DNSName) | |
| ./ca/ca_engine.py: if profile.allowed_san_regex: | |
| ./ca/ca_engine.py: import re | |
| ./ca/ca_engine.py: for value in san_values: | |
| ./ca/ca_engine.py: if not re.match(profile.allowed_san_regex, value): | |
| ./ca/ca_engine.py: raise ValueError(f"SAN {value} not allowed by policy") | |
| ./ca/ca_engine.py: except x509.ExtensionNotFound: | |
| ./ca/ca_engine.py: pass | |
| ./ca/ca_engine.py: def _build_certificate( | |
| ./ca/ca_engine.py: self, | |
| ./ca/ca_engine.py: csr: x509.CertificateSigningRequest, | |
| ./ca/ca_engine.py: profile: CertificateProfile, | |
| ./ca/ca_engine.py: ) -> IssuedBundle: | |
| ./ca/ca_engine.py: serial_number = uuid.uuid4().hex | |
| ./ca/ca_engine.py: not_before = datetime.datetime.utcnow() | |
| ./ca/ca_engine.py: not_after = not_before + datetime.timedelta(days=profile.validity_days) | |
| ./ca/ca_engine.py: builder = ( | |
| ./ca/ca_engine.py: x509.CertificateBuilder() | |
| ./ca/ca_engine.py: .subject_name(csr.subject) | |
| ./ca/ca_engine.py: .issuer_name(self.ca_cert.subject) | |
| ./ca/ca_engine.py: .public_key(csr.public_key()) | |
| ./ca/ca_engine.py: .serial_number(int(serial_number[:16], 16)) | |
| ./ca/ca_engine.py: .not_valid_before(not_before) | |
| ./ca/ca_engine.py: .not_valid_after(not_after) | |
| ./ca/ca_engine.py: ) | |
| ./ca/ca_engine.py: # copy extensions from CSR if valid | |
| ./ca/ca_engine.py: for ext in csr.extensions: | |
| ./ca/ca_engine.py: builder = builder.add_extension(ext.value, ext.critical) | |
| ./ca/ca_engine.py: cert = builder.sign( | |
| ./ca/ca_engine.py: private_key=self.ca_private_key, | |
| ./ca/ca_engine.py: algorithm=hashes.SHA256(), | |
| ./ca/ca_engine.py: ) | |
| ./ca/ca_engine.py: cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode() | |
| ./ca/ca_engine.py: ca_bundle_pem = self.ca_cert.public_bytes( | |
| ./ca/ca_engine.py: serialization.Encoding.PEM | |
| ./ca/ca_engine.py: ).decode() | |
| ./ca/ca_engine.py: return IssuedBundle( | |
| ./ca/ca_engine.py: certificate_pem=cert_pem, | |
| ./ca/ca_engine.py: ca_bundle_pem=ca_bundle_pem, | |
| ./ca/ca_engine.py: serial_number=serial_number, | |
| ./ca/ca_engine.py: not_after=not_after, | |
| ./ca/ca_engine.py: ) | |
| ./ca/ca_engine.py: def issue_certificate( | |
| ./ca/ca_engine.py: self, | |
| ./ca/ca_engine.py: hostname: str, | |
| ./ca/ca_engine.py: csr_pem: str, | |
| ./ca/ca_engine.py: profile_name: str, | |
| ./ca/ca_engine.py: ) -> Dict[str, Any]: | |
| ./ca/ca_engine.py: profile = self._load_profile(profile_name) | |
| ./ca/ca_engine.py: csr = x509.load_pem_x509_csr(csr_pem.encode()) | |
| ./ca/ca_engine.py: self._validate_csr(csr, profile) | |
| ./ca/ca_engine.py: bundle = self._build_certificate(csr, profile) | |
| ./ca/ca_engine.py: IssuedCertificate.objects.create( | |
| ./ca/ca_engine.py: serial_number=bundle.serial_number, | |
| ./ca/ca_engine.py: hostname=hostname, | |
| ./ca/ca_engine.py: profile_name=profile_name, | |
| ./ca/ca_engine.py: certificate_pem=bundle.certificate_pem, | |
| ./ca/ca_engine.py: ca_bundle_pem=bundle.ca_bundle_pem, | |
| ./ca/ca_engine.py: issued_at=str(datetime.datetime.utcnow()), | |
| ./ca/ca_engine.py: not_after=str(bundle.not_after), | |
| ./ca/ca_engine.py: last_csr=csr_pem, | |
| ./ca/ca_engine.py: revoked=False, | |
| ./ca/ca_engine.py: ) | |
| ./ca/ca_engine.py: return { | |
| ./ca/ca_engine.py: "serial_number": bundle.serial_number, | |
| ./ca/ca_engine.py: "certificate": bundle.certificate_pem, | |
| ./ca/ca_engine.py: "ca_bundle": bundle.ca_bundle_pem, | |
| ./ca/ca_engine.py: "not_after": bundle.not_after.isoformat(), | |
| ./ca/ca_engine.py: } | |
| ./ca/policy.py:# ca/policy.py | |
| ./ca/policy.py:""" | |
| ./ca/policy.py:Policy engine for certificate issuance. | |
| ./ca/policy.py:This module is responsible for: | |
| ./ca/policy.py:- deciding whether a host may request a certificate | |
| ./ca/policy.py:- selecting valid certificate profiles | |
| ./ca/policy.py:- enforcing LDAP-driven constraints (via registry layer) | |
| ./ca/policy.py:It does NOT perform issuance or cryptography. | |
| ./ca/policy.py:""" | |
| ./ca/policy.py:import re | |
| ./ca/policy.py:from typing import List | |
| ./ca/policy.py:from ca.models import Host, CertificateProfile | |
| ./ca/policy.py:class PolicyEngine: | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: Central policy evaluation layer. | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: def get_host_profiles(self, host: Host) -> List[CertificateProfile]: | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: Returns certificate profiles allowed for a given host. | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: return list(CertificateProfile.objects.all()) | |
| ./ca/policy.py: def is_host_allowed(self, host: Host) -> bool: | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: Basic allow/deny gate for enrollment eligibility. | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: if host is None: | |
| ./ca/policy.py: return False | |
| ./ca/policy.py: return True | |
| ./ca/policy.py: def validate_hostname(self, hostname: str) -> bool: | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: Minimal hostname sanity check (not DNS validation). | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: return bool(re.match(r"^[a-zA-Z0-9.-]+$", hostname)) | |
| ./ca/policy.py: def filter_profiles_for_hostname( | |
| ./ca/policy.py: self, | |
| ./ca/policy.py: hostname: str, | |
| ./ca/policy.py: profiles: List[CertificateProfile], | |
| ./ca/policy.py: ) -> List[CertificateProfile]: | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: Filters profiles based on hostname policy constraints. | |
| ./ca/policy.py: """ | |
| ./ca/policy.py: valid = [] | |
| ./ca/policy.py: for profile in profiles: | |
| ./ca/policy.py: if profile.allowed_cn_regex: | |
| ./ca/policy.py: if not re.match(profile.allowed_cn_regex, hostname): | |
| ./ca/policy.py: continue | |
| ./ca/policy.py: valid.append(profile) | |
| ./ca/policy.py: return valid | |
| ./ca/notification_channel.py:# ca/notification_channel.py | |
| ./ca/notification_channel.py:""" | |
| ./ca/notification_channel.py:Notification channel for control-plane messages. | |
| ./ca/notification_channel.py:This is used to send signed UDP/DTLS-compatible payloads | |
| ./ca/notification_channel.py:to enrolled clients. | |
| ./ca/notification_channel.py:The CA does NOT maintain persistent connections. | |
| ./ca/notification_channel.py:It only emits stateless messages. | |
| ./ca/notification_channel.py:""" | |
| ./ca/notification_channel.py:import json | |
| ./ca/notification_channel.py:import socket | |
| ./ca/notification_channel.py:import time | |
| ./ca/notification_channel.py:import uuid | |
| ./ca/notification_channel.py:import hashlib | |
| ./ca/notification_channel.py:from typing import Dict, Any | |
| ./ca/notification_channel.py:class NotificationChannel: | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: Stateless UDP notification emitter. | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: def __init__(self, ca_private_key_pem: bytes): | |
| ./ca/notification_channel.py: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| ./ca/notification_channel.py: self.ca_private_key_pem = ca_private_key_pem # reserved for future signing implementation | |
| ./ca/notification_channel.py: def _sign_payload(self, payload: Dict[str, Any]) -> str: | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: Lightweight placeholder signature mechanism. | |
| ./ca/notification_channel.py: NOTE: | |
| ./ca/notification_channel.py: This should be replaced with real asymmetric signing | |
| ./ca/notification_channel.py: (e.g. RSA/ECDSA using CA private key). | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: raw = json.dumps(payload, sort_keys=True).encode() | |
| ./ca/notification_channel.py: return hashlib.sha256(raw + self.ca_private_key_pem).hexdigest() | |
| ./ca/notification_channel.py: def send(self, host: str, port: int, event: str, data: Dict[str, Any]): | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: Send a signed control-plane event to a client. | |
| ./ca/notification_channel.py: """ | |
| ./ca/notification_channel.py: payload = { | |
| ./ca/notification_channel.py: "id": str(uuid.uuid4()), | |
| ./ca/notification_channel.py: "timestamp": int(time.time()), | |
| ./ca/notification_channel.py: "event": event, | |
| ./ca/notification_channel.py: "data": data, | |
| ./ca/notification_channel.py: } | |
| ./ca/notification_channel.py: payload["signature"] = self._sign_payload(payload) | |
| ./ca/notification_channel.py: self.socket.sendto( | |
| ./ca/notification_channel.py: json.dumps(payload).encode(), | |
| ./ca/notification_channel.py: (host, port), | |
| ./ca/notification_channel.py: ) | |
| ./ca/message_signing.py:# ca/message_signing.py | |
| ./ca/message_signing.py:""" | |
| ./ca/message_signing.py:Cryptographic message signing utilities for CA control-plane traffic. | |
| ./ca/message_signing.py:This module is the *real* replacement for any placeholder hashing | |
| ./ca/message_signing.py:done in notification_channel.py. | |
| ./ca/message_signing.py:It provides deterministic, verifiable signatures for: | |
| ./ca/message_signing.py:- UDP control-plane messages | |
| ./ca/message_signing.py:- internal CA events | |
| ./ca/message_signing.py:- audit messages (optional) | |
| ./ca/message_signing.py:""" | |
| ./ca/message_signing.py:import json | |
| ./ca/message_signing.py:from typing import Dict, Any | |
| ./ca/message_signing.py:from cryptography.hazmat.primitives import hashes, serialization | |
| ./ca/message_signing.py:from cryptography.hazmat.primitives.asymmetric import padding, rsa | |
| ./ca/message_signing.py:class MessageSigner: | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: RSA-based signing for CA-originated messages. | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: def __init__(self, private_key_pem: bytes): | |
| ./ca/message_signing.py: self.private_key = serialization.load_pem_private_key( | |
| ./ca/message_signing.py: private_key_pem, | |
| ./ca/message_signing.py: password=None, | |
| ./ca/message_signing.py: ) | |
| ./ca/message_signing.py: def sign(self, payload: Dict[str, Any]) -> str: | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: Sign a JSON-serializable payload. | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: raw = json.dumps(payload, sort_keys=True).encode() | |
| ./ca/message_signing.py: signature = self.private_key.sign( | |
| ./ca/message_signing.py: raw, | |
| ./ca/message_signing.py: padding.PKCS1v15(), | |
| ./ca/message_signing.py: hashes.SHA256(), | |
| ./ca/message_signing.py: ) | |
| ./ca/message_signing.py: return signature.hex() | |
| ./ca/message_signing.py: def verify(self, payload: Dict[str, Any], signature_hex: str) -> bool: | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: Verify a signature (useful for testing or internal validation). | |
| ./ca/message_signing.py: """ | |
| ./ca/message_signing.py: raw = json.dumps(payload, sort_keys=True).encode() | |
| ./ca/message_signing.py: signature = bytes.fromhex(signature_hex) | |
| ./ca/message_signing.py: public_key = self.private_key.public_key() | |
| ./ca/message_signing.py: try: | |
| ./ca/message_signing.py: public_key.verify( | |
| ./ca/message_signing.py: signature, | |
| ./ca/message_signing.py: raw, | |
| ./ca/message_signing.py: padding.PKCS1v15(), | |
| ./ca/message_signing.py: hashes.SHA256(), | |
| ./ca/message_signing.py: ) | |
| ./ca/message_signing.py: return True | |
| ./ca/message_signing.py: except Exception: | |
| ./ca/message_signing.py: return False | |
| ./ca/registry.py:# ca/registry.py | |
| ./ca/registry.py:""" | |
| ./ca/registry.py:Registry layer for CA system. | |
| ./ca/registry.py:Now fully aligned with django_ca.settings as the single | |
| ./ca/registry.py:source of truth for LDAP configuration. | |
| ./ca/registry.py:This module: | |
| ./ca/registry.py:- does NOT define LDAP structure | |
| ./ca/registry.py:- does NOT hardcode DN layout | |
| ./ca/registry.py:- only performs object access via django-ldapdb models | |
| ./ca/registry.py:""" | |
| ./ca/registry.py:from typing import List, Optional | |
| ./ca/registry.py:from django.conf import settings | |
| ./ca/registry.py:from ca.models import Host, IssuedCertificate, CertificateProfile | |
| ./ca/registry.py:class HostRegistry: | |
| ./ca/registry.py: def get_host(self, hostname: str) -> Optional[Host]: | |
| ./ca/registry.py: try: | |
| ./ca/registry.py: return Host.objects.get(cn=hostname) | |
| ./ca/registry.py: except Host.DoesNotExist: | |
| ./ca/registry.py: return None | |
| ./ca/registry.py: def list_hosts(self) -> List[Host]: | |
| ./ca/registry.py: return list(Host.objects.all()) | |
| ./ca/registry.py: def enrolled(self) -> List[Host]: | |
| ./ca/registry.py: return list(Host.objects.filter(is_enrolled=True)) | |
| ./ca/registry.py: def mark_enrolled(self, hostname: str) -> Host: | |
| ./ca/registry.py: host = self.get_host(hostname) | |
| ./ca/registry.py: if not host: | |
| ./ca/registry.py: raise ValueError(f"Unknown host: {hostname}") | |
| ./ca/registry.py: host.is_enrolled = True | |
| ./ca/registry.py: host.save() | |
| ./ca/registry.py: return host | |
| ./ca/registry.py:class CertificateRegistry: | |
| ./ca/registry.py: def get(self, serial_number: str) -> Optional[IssuedCertificate]: | |
| ./ca/registry.py: try: | |
| ./ca/registry.py: return IssuedCertificate.objects.get(serial_number=serial_number) | |
| ./ca/registry.py: except IssuedCertificate.DoesNotExist: | |
| ./ca/registry.py: return None | |
| ./ca/registry.py: def for_host(self, hostname: str) -> List[IssuedCertificate]: | |
| ./ca/registry.py: return list(IssuedCertificate.objects.filter(hostname=hostname)) | |
| ./ca/registry.py: def active(self) -> List[IssuedCertificate]: | |
| ./ca/registry.py: return list(IssuedCertificate.objects.filter(revoked=False)) | |
| ./ca/registry.py: def revoked(self) -> List[IssuedCertificate]: | |
| ./ca/registry.py: return list(IssuedCertificate.objects.filter(revoked=True)) | |
| ./ca/registry.py: def revoke(self, serial_number: str) -> IssuedCertificate: | |
| ./ca/registry.py: cert = self.get(serial_number) | |
| ./ca/registry.py: if not cert: | |
| ./ca/registry.py: raise ValueError(f"Unknown certificate: {serial_number}") | |
| ./ca/registry.py: cert.revoked = True | |
| ./ca/registry.py: cert.save() | |
| ./ca/registry.py: return cert | |
| ./ca/registry.py:class ProfileRegistry: | |
| ./ca/registry.py: def get(self, name: str) -> Optional[CertificateProfile]: | |
| ./ca/registry.py: try: | |
| ./ca/registry.py: return CertificateProfile.objects.get(name=name) | |
| ./ca/registry.py: except CertificateProfile.DoesNotExist: | |
| ./ca/registry.py: return None | |
| ./ca/registry.py: def list(self) -> List[CertificateProfile]: | |
| ./ca/registry.py: return list(CertificateProfile.objects.all()) | |
| ./ca/registry.py: def list_all(self) -> List[CertificateProfile]: | |
| ./ca/registry.py: """ | |
| ./ca/registry.py: Alias for compatibility with policy/ACME layers. | |
| ./ca/registry.py: """ | |
| ./ca/registry.py: return self.list() | |
| ./ca/renew.py:# ca/renew.py | |
| ./ca/renew.py:import time | |
| ./ca/renew.py:import json | |
| ./ca/renew.py:from datetime import datetime, timedelta | |
| ./ca/renew.py:from ca.models import IssuedCertificate | |
| ./ca/renew.py:from ca.issuance import issue_certificate | |
| ./ca/renew.py:RENEWAL_THRESHOLD_DAYS = 10 | |
| ./ca/renew.py:def needs_renewal(cert: IssuedCertificate) -> bool: | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: Determines whether a certificate is close enough to expiry to require renewal. | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: if not cert.not_after: | |
| ./ca/renew.py: return False | |
| ./ca/renew.py: return cert.not_after <= datetime.utcnow() + timedelta(days=RENEWAL_THRESHOLD_DAYS) | |
| ./ca/renew.py:def build_renewal_csr(cert: IssuedCertificate) -> str: | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: Placeholder: regenerate CSR based on stored metadata. | |
| ./ca/renew.py: In a real implementation this would: | |
| ./ca/renew.py: - fetch host key material (or request regeneration) | |
| ./ca/renew.py: - rebuild CSR using same subject + SAN profile | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: return cert.last_csr | |
| ./ca/renew.py:def renew_certificate(cert: IssuedCertificate): | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: Performs certificate renewal using existing issuance pipeline. | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: csr = build_renewal_csr(cert) | |
| ./ca/renew.py: return issue_certificate( | |
| ./ca/renew.py: hostname=cert.hostname, | |
| ./ca/renew.py: csr_pem=csr, | |
| ./ca/renew.py: profile_name=cert.profile_name, | |
| ./ca/renew.py: ) | |
| ./ca/renew.py:def run_renewal_cycle(): | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: Scans all issued certificates and renews those nearing expiry. | |
| ./ca/renew.py: """ | |
| ./ca/renew.py: now = datetime.utcnow() | |
| ./ca/renew.py: certs = IssuedCertificate.objects.all() | |
| ./ca/renew.py: renewed = 0 | |
| ./ca/renew.py: for cert in certs: | |
| ./ca/renew.py: if cert.not_after and cert.not_after <= now + timedelta(days=RENEWAL_THRESHOLD_DAYS): | |
| ./ca/renew.py: try: | |
| ./ca/renew.py: renew_certificate(cert) | |
| ./ca/renew.py: renewed += 1 | |
| ./ca/renew.py: except Exception: | |
| ./ca/renew.py: continue | |
| ./ca/renew.py: return {"renewed": renewed, "timestamp": time.time()} | |
| ./ca/renew.py:def main(): | |
| ./ca/renew.py: result = run_renewal_cycle() | |
| ./ca/renew.py: print(json.dumps(result, indent=2)) | |
| ./ca/renew.py:if __name__ == "__main__": | |
| ./ca/renew.py: main() | |
| ./ca/__init__.py:# ca/__init__.py | |
| ./ca/__init__.py:""" | |
| ./ca/__init__.py:CA application package initializer. | |
| ./ca/__init__.py:This module exposes a minimal public API for the CA system | |
| ./ca/__init__.py:while keeping internal modules isolated. | |
| ./ca/__init__.py:Public surface: | |
| ./ca/__init__.py:- issue_certificate_task (Celery entrypoint) | |
| ./ca/__init__.py:- CAEngine (core cryptographic engine) | |
| ./ca/__init__.py:- registry access helpers (optional) | |
| ./ca/__init__.py:""" | |
| ./ca/__init__.py:from ca.ca_engine import CAEngine | |
| ./ca/__init__.py:from ca.issuance import issue_certificate | |
| ./ca/__init__.py:from ca.registry import HostRegistry, CertificateRegistry | |
| ./ca/__init__.py:__all__ = [ | |
| ./ca/__init__.py: "CAEngine", | |
| ./ca/__init__.py: "issue_certificate", | |
| ./ca/__init__.py: "HostRegistry", | |
| ./ca/__init__.py: "CertificateRegistry", | |
| ./ca/__init__.py:] | |
| ./client/csr.py:# client/csr.py | |
| ./client/csr.py:""" | |
| ./client/csr.py:Client-side cryptographic utilities. | |
| ./client/csr.py:Responsible for: | |
| ./client/csr.py:- generating private keys | |
| ./client/csr.py:- building X.509 CSRs | |
| ./client/csr.py:- embedding hostname + SANs in a policy-compatible way | |
| ./client/csr.py:This module must remain CA-agnostic: | |
| ./client/csr.py:it does NOT know about LDAP, Django, or issuance logic. | |
| ./client/csr.py:""" | |
| ./client/csr.py:import socket | |
| ./client/csr.py:from typing import Optional, List | |
| ./client/csr.py:from cryptography import x509 | |
| ./client/csr.py:from cryptography.x509.oid import NameOID | |
| ./client/csr.py:from cryptography.hazmat.primitives import hashes, serialization | |
| ./client/csr.py:from cryptography.hazmat.primitives.asymmetric import rsa | |
| ./client/csr.py:def generate_private_key(key_size: int = 2048) -> rsa.RSAPrivateKey: | |
| ./client/csr.py: """ | |
| ./client/csr.py: Generate RSA private key locally on the client. | |
| ./client/csr.py: """ | |
| ./client/csr.py: return rsa.generate_private_key(public_exponent=65537, key_size=key_size) | |
| ./client/csr.py:def _default_hostname() -> str: | |
| ./client/csr.py: return socket.getfqdn() | |
| ./client/csr.py:def generate_csr( | |
| ./client/csr.py: private_key: rsa.RSAPrivateKey, | |
| ./client/csr.py: hostname: Optional[str] = None, | |
| ./client/csr.py: san_dns_names: Optional[List[str]] = None, | |
| ./client/csr.py:) -> bytes: | |
| ./client/csr.py: """ | |
| ./client/csr.py: Generate a PEM-encoded CSR. | |
| ./client/csr.py: The CSR includes: | |
| ./client/csr.py: - Common Name (CN) | |
| ./client/csr.py: - Subject Alternative Names (SAN) | |
| ./client/csr.py: """ | |
| ./client/csr.py: hostname = hostname or _default_hostname() | |
| ./client/csr.py: san_dns_names = san_dns_names or [hostname] | |
| ./client/csr.py: subject = x509.Name( | |
| ./client/csr.py: [ | |
| ./client/csr.py: x509.NameAttribute(NameOID.COMMON_NAME, hostname), | |
| ./client/csr.py: ] | |
| ./client/csr.py: ) | |
| ./client/csr.py: csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject) | |
| ./client/csr.py: san = x509.SubjectAlternativeName( | |
| ./client/csr.py: [x509.DNSName(name) for name in san_dns_names] | |
| ./client/csr.py: ) | |
| ./client/csr.py: csr_builder = csr_builder.add_extension(san, critical=False) | |
| ./client/csr.py: csr = csr_builder.sign( | |
| ./client/csr.py: private_key, | |
| ./client/csr.py: hashes.SHA256(), | |
| ./client/csr.py: ) | |
| ./client/csr.py: return csr.public_bytes(serialization.Encoding.PEM) | |
| ./client/csr.py:def serialize_private_key(private_key: rsa.RSAPrivateKey) -> bytes: | |
| ./client/csr.py: """ | |
| ./client/csr.py: Serialize private key in PEM format (unencrypted). | |
| ./client/csr.py: """ | |
| ./client/csr.py: return private_key.private_bytes( | |
| ./client/csr.py: encoding=serialization.Encoding.PEM, | |
| ./client/csr.py: format=serialization.PrivateFormat.TraditionalOpenSSL, | |
| ./client/csr.py: encryption_algorithm=serialization.NoEncryption(), | |
| ./client/csr.py: ) | |
| ./client/agent.py:# client/agent.py | |
| ./client/agent.py:import socket | |
| ./client/agent.py:import json | |
| ./client/agent.py:import time | |
| ./client/agent.py:import os | |
| ./client/agent.py:import ssl | |
| ./client/agent.py:from pathlib import Path | |
| ./client/agent.py:from typing import Dict, Any | |
| ./client/agent.py:from cryptography.hazmat.primitives import hashes | |
| ./client/agent.py:from cryptography.hazmat.primitives.asymmetric import padding | |
| ./client/agent.py:from cryptography.x509 import load_pem_x509_certificate | |
| ./client/agent.py:DEFAULT_BIND = ("0.0.0.0", 9999) | |
| ./client/agent.py:DEFAULT_CA_CERT_PATH = Path("/etc/ssl/ca/ca.pem") | |
| ./client/agent.py:DEFAULT_CLIENT_CERT_PATH = Path("/etc/ssl/client/client.pem") | |
| ./client/agent.py:DEFAULT_CLIENT_KEY_PATH = Path("/etc/ssl/client/client.key") | |
| ./client/agent.py:class ControlPlaneAgent: | |
| ./client/agent.py: """ | |
| ./client/agent.py: UDP-based control-plane agent. | |
| ./client/agent.py: Responsibilities: | |
| ./client/agent.py: - Receive signed messages from CA server | |
| ./client/agent.py: - Validate signature using CA certificate | |
| ./client/agent.py: - Enforce replay protection (nonce + timestamp) | |
| ./client/agent.py: - Execute allowed event handlers | |
| ./client/agent.py: """ | |
| ./client/agent.py: def __init__(self, bind_addr=DEFAULT_BIND): | |
| ./client/agent.py: self.bind_addr = bind_addr | |
| ./client/agent.py: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| ./client/agent.py: self.socket.bind(self.bind_addr) | |
| ./client/agent.py: self.ca_cert = self._load_ca_cert() | |
| ./client/agent.py: self._seen_nonces = set() | |
| ./client/agent.py: def _load_ca_cert(self): | |
| ./client/agent.py: if not DEFAULT_CA_CERT_PATH.exists(): | |
| ./client/agent.py: raise RuntimeError("CA certificate not found at /etc/ssl/ca/ca.pem") | |
| ./client/agent.py: return load_pem_x509_certificate(DEFAULT_CA_CERT_PATH.read_bytes()) | |
| ./client/agent.py: def _verify_signature(self, payload: Dict[str, Any]) -> bool: | |
| ./client/agent.py: signature = bytes.fromhex(payload["signature"]) | |
| ./client/agent.py: unsigned = json.dumps( | |
| ./client/agent.py: {k: payload[k] for k in payload if k != "signature"}, | |
| ./client/agent.py: sort_keys=True | |
| ./client/agent.py: ).encode() | |
| ./client/agent.py: pubkey = self.ca_cert.public_key() | |
| ./client/agent.py: try: | |
| ./client/agent.py: pubkey.verify( | |
| ./client/agent.py: signature, | |
| ./client/agent.py: unsigned, | |
| ./client/agent.py: padding.PKCS1v15(), | |
| ./client/agent.py: hashes.SHA256(), | |
| ./client/agent.py: ) | |
| ./client/agent.py: return True | |
| ./client/agent.py: except Exception: | |
| ./client/agent.py: return False | |
| ./client/agent.py: def _check_replay(self, payload: Dict[str, Any]) -> bool: | |
| ./client/agent.py: nonce = payload.get("nonce") | |
| ./client/agent.py: ts = payload.get("timestamp", 0) | |
| ./client/agent.py: if nonce in self._seen_nonces: | |
| ./client/agent.py: return False | |
| ./client/agent.py: now = int(time.time()) | |
| ./client/agent.py: if abs(now - ts) > payload.get("ttl", 30): | |
| ./client/agent.py: return False | |
| ./client/agent.py: self._seen_nonces.add(nonce) | |
| ./client/agent.py: return True | |
| ./client/agent.py: def _handle_event(self, payload: Dict[str, Any]): | |
| ./client/agent.py: event = payload.get("event") | |
| ./client/agent.py: if event == "reboot": | |
| ./client/agent.py: os.system("reboot") | |
| ./client/agent.py: elif event == "shutdown": | |
| ./client/agent.py: os.system("shutdown now") | |
| ./client/agent.py: elif event == "noop": | |
| ./client/agent.py: pass | |
| ./client/agent.py: else: | |
| ./client/agent.py: # Unknown events are ignored for safety | |
| ./client/agent.py: pass | |
| ./client/agent.py: def run(self): | |
| ./client/agent.py: print(f"Agent listening on {self.bind_addr}") | |
| ./client/agent.py: while True: | |
| ./client/agent.py: data, addr = self.socket.recvfrom(65535) | |
| ./client/agent.py: try: | |
| ./client/agent.py: payload = json.loads(data.decode()) | |
| ./client/agent.py: except Exception: | |
| ./client/agent.py: continue | |
| ./client/agent.py: if not self._check_replay(payload): | |
| ./client/agent.py: continue | |
| ./client/agent.py: if not self._verify_signature(payload): | |
| ./client/agent.py: continue | |
| ./client/agent.py: self._handle_event(payload) | |
| ./client/agent.py:def main(): | |
| ./client/agent.py: agent = ControlPlaneAgent() | |
| ./client/agent.py: agent.run() | |
| ./client/agent.py:if __name__ == "__main__": | |
| ./client/agent.py: main() | |
| ./client/enroll.py:# enroll.py | |
| ./client/enroll.py:""" | |
| ./client/enroll.py:Client enrollment bootstrap agent. | |
| ./client/enroll.py:This script performs ONE privileged action: | |
| ./client/enroll.py:- initiate Kerberos-backed enrollment | |
| ./client/enroll.py:- obtain allowed certificate profiles from CA | |
| ./client/enroll.py:- generate private key locally | |
| ./client/enroll.py:- generate CSR | |
| ./client/enroll.py:- request certificate issuance | |
| ./client/enroll.py:- install certificates into /etc/ssl | |
| ./client/enroll.py:After this runs successfully: | |
| ./client/enroll.py:- Kerberos is no longer used | |
| ./client/enroll.py:- all further auth is certificate-based | |
| ./client/enroll.py:""" | |
| ./client/enroll.py:import json | |
| ./client/enroll.py:import socket | |
| ./client/enroll.py:from pathlib import Path | |
| ./client/enroll.py:import requests | |
| ./client/enroll.py:from client.csr import generate_csr, generate_private_key | |
| ./client/enroll.py:CA_URL = "https://ca.local:8443" | |
| ./client/enroll.py:def auto_hostname() -> str: | |
| ./client/enroll.py: """ | |
| ./client/enroll.py: Best-effort system hostname detection. | |
| ./client/enroll.py: """ | |
| ./client/enroll.py: return socket.getfqdn() | |
| ./client/enroll.py:def kerberos_principal() -> str: | |
| ./client/enroll.py: """ | |
| ./client/enroll.py: In real deployment this comes from: | |
| ./client/enroll.py: - kinit session cache | |
| ./client/enroll.py: - or external SSO tool | |
| ./client/enroll.py: """ | |
| ./client/enroll.py: raise NotImplementedError("Kerberos integration required") | |
| ./client/enroll.py:def enroll(): | |
| ./client/enroll.py: hostname = auto_hostname() | |
| ./client/enroll.py: principal = kerberos_principal() | |
| ./client/enroll.py: # Step 1: initiate enrollment (Kerberos gate) | |
| ./client/enroll.py: resp = requests.post( | |
| ./client/enroll.py: f"{CA_URL}/enroll", | |
| ./client/enroll.py: json={ | |
| ./client/enroll.py: "hostname": hostname, | |
| ./client/enroll.py: "kerberos_principal": principal, | |
| ./client/enroll.py: }, | |
| ./client/enroll.py: timeout=10, | |
| ./client/enroll.py: ) | |
| ./client/enroll.py: resp.raise_for_status() | |
| ./client/enroll.py: data = resp.json() | |
| ./client/enroll.py: profiles = data["profiles"] | |
| ./client/enroll.py: if not profiles: | |
| ./client/enroll.py: raise RuntimeError("No certificate profiles available") | |
| ./client/enroll.py: # Always enforce control-plane cert existence | |
| ./client/enroll.py: profile = "control-plane" if "control-plane" in profiles else profiles[0] | |
| ./client/enroll.py: # Step 2: generate key + CSR locally | |
| ./client/enroll.py: key = generate_private_key() | |
| ./client/enroll.py: csr = generate_csr(key, hostname) | |
| ./client/enroll.py: key_pem = key.private_bytes( | |
| ./client/enroll.py: encoding=None, | |
| ./client/enroll.py: format=None, | |
| ./client/enroll.py: encryption_algorithm=None, | |
| ./client/enroll.py: ) | |
| ./client/enroll.py: # Step 3: request certificate | |
| ./client/enroll.py: resp = requests.post( | |
| ./client/enroll.py: f"{CA_URL}/issue", | |
| ./client/enroll.py: json={ | |
| ./client/enroll.py: "hostname": hostname, | |
| ./client/enroll.py: "csr": csr.decode(), | |
| ./client/enroll.py: "profile": profile, | |
| ./client/enroll.py: }, | |
| ./client/enroll.py: timeout=20, | |
| ./client/enroll.py: ) | |
| ./client/enroll.py: resp.raise_for_status() | |
| ./client/enroll.py: result = resp.json() | |
| ./client/enroll.py: # Step 4: install certs | |
| ./client/enroll.py: ssl_dir = Path("/etc/ssl/ca") | |
| ./client/enroll.py: ssl_dir.mkdir(parents=True, exist_ok=True) | |
| ./client/enroll.py: (ssl_dir / "client.key").write_bytes(key_pem.encode() if hasattr(key_pem, "encode") else key_pem) | |
| ./client/enroll.py: (ssl_dir / "client.crt").write_text(result["certificate"]) | |
| ./client/enroll.py: (ssl_dir / "ca.crt").write_text(result["ca_bundle"]) | |
| ./client/enroll.py: print("Enrollment complete:", hostname) | |
| ./client/enroll.py:if __name__ == "__main__": | |
| ./client/enroll.py: enroll() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment