Skip to content

Instantly share code, notes, and snippets.

@m1cr0man
Created November 26, 2021 19:21
Show Gist options
  • Save m1cr0man/4f096c63acf2954d10b54519a6ba2f0f to your computer and use it in GitHub Desktop.
Save m1cr0man/4f096c63acf2954d10b54519a6ba2f0f to your computer and use it in GitHub Desktop.
NixOS acme selfsigned CA service failure test
let
commonConfig = ./common/acme/client;
dnsServerIP = nodes: nodes.dnsserver.config.networking.primaryIPAddress;
dnsScript = {pkgs, nodes}: let
dnsAddress = dnsServerIP nodes;
in pkgs.writeShellScript "dns-hook.sh" ''
set -euo pipefail
echo '[INFO]' "[$2]" 'dns-hook.sh' $*
if [ "$1" = "present" ]; then
${pkgs.curl}/bin/curl --data '{"host": "'"$2"'", "value": "'"$3"'"}' http://${dnsAddress}:8055/set-txt
else
${pkgs.curl}/bin/curl --data '{"host": "'"$2"'"}' http://${dnsAddress}:8055/clear-txt
fi
'';
documentRoot = pkgs: pkgs.runCommand "docroot" {} ''
mkdir -p "$out"
echo hello world > "$out/index.html"
'';
vhostBase = pkgs: {
forceSSL = true;
locations."/".root = documentRoot pkgs;
};
in import ./make-test-python.nix ({ lib, ... }: {
name = "acme";
meta.maintainers = lib.teams.acme.members;
nodes = {
# The fake ACME server which will respond to client requests
acme = { nodes, lib, ... }: {
imports = [ ./common/acme/server ];
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ];
};
# A fake DNS server which can be configured with records as desired
# Used to test DNS-01 challenge
dnsserver = { nodes, pkgs, ... }: {
networking.firewall.allowedTCPPorts = [ 8055 53 ];
networking.firewall.allowedUDPPorts = [ 53 ];
systemd.services.pebble-challtestsrv = {
enable = true;
description = "Pebble ACME challenge test server";
wantedBy = [ "network.target" ];
serviceConfig = {
ExecStart = "${pkgs.pebble}/bin/pebble-challtestsrv -dns01 ':53' -defaultIPv6 '' -defaultIPv4 '${nodes.webserver.config.networking.primaryIPAddress}'";
# Required to bind on privileged ports.
AmbientCapabilities = [ "CAP_NET_BIND_SERVICE" ];
};
};
};
# A web server which will be the node requesting certs
webserver = { pkgs, nodes, lib, config, ... }: {
imports = [ commonConfig ];
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ];
networking.firewall.allowedTCPPorts = [ 80 443 ];
# OpenSSL will be used for more thorough certificate validation
environment.systemPackages = [ pkgs.openssl ];
# Set log level to info so that we can see when the service is reloaded
services.nginx.enable = true;
services.nginx.logError = "stderr info";
# First tests configure a basic cert and run a bunch of openssl checks
services.nginx.virtualHosts."a.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.httpd.enable = true;
services.httpd.adminAddr = "[email protected]";
services.httpd.virtualHosts."z.example.test" = {
enableACME = true;
forceSSL = true;
};
specialisation.many-certs.configuration = { nodes, pkgs, lib, ... }: {
services.nginx.virtualHosts."b.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.nginx.virtualHosts."c.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
};
specialisation.many-certs-b.configuration = { nodes, pkgs, lib, ... }: {
services.nginx.virtualHosts."d.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.nginx.virtualHosts."e.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
};
specialisation.many-certs-c.configuration = { nodes, pkgs, lib, ... }: {
services.nginx.virtualHosts."f.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.nginx.virtualHosts."g.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.nginx.virtualHosts."h.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
services.nginx.virtualHosts."i.example.test" = (vhostBase pkgs) // {
enableACME = true;
};
};
};
# The client will be used to curl the webserver to validate configuration
client = {nodes, lib, pkgs, ...}: {
imports = [ commonConfig ];
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ];
# OpenSSL will be used for more thorough certificate validation
environment.systemPackages = [ pkgs.openssl ];
};
};
testScript = {nodes, ...}:
let
caDomain = nodes.acme.config.test-support.acme.caDomain;
newServerSystem = nodes.webserver.config.system.build.toplevel;
switchToNewServer = "${newServerSystem}/bin/switch-to-configuration test";
in
# Note, wait_for_unit does not work for oneshot services that do not have RemainAfterExit=true,
# this is because a oneshot goes from inactive => activating => inactive, and never
# reaches the active state. Targets do not have this issue.
''
import time
has_switched = False
def switch_to(node, name):
global has_switched
if has_switched:
node.succeed(
"${switchToNewServer}"
)
has_switched = True
node.succeed(
f"/run/current-system/specialisation/{name}/bin/switch-to-configuration test"
)
# Ensures the issuer of our cert matches the chain
# and matches the issuer we expect it to be.
# It's a good validation to ensure the cert.pem and fullchain.pem
# are not still selfsigned afer verification
def check_issuer(node, cert_name, issuer):
for fname in ("cert.pem", "fullchain.pem"):
actual_issuer = node.succeed(
f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}"
).partition("=")[2]
print(f"{fname} issuer: {actual_issuer}")
assert issuer.lower() in actual_issuer.lower()
# Ensure cert comes before chain in fullchain.pem
def check_fullchain(node, cert_name):
subject_data = node.succeed(
f"openssl crl2pkcs7 -nocrl -certfile /var/lib/acme/{cert_name}/fullchain.pem"
" | openssl pkcs7 -print_certs -noout"
)
for line in subject_data.lower().split("\n"):
if "subject" in line:
print(f"First subject in fullchain.pem: {line}")
assert cert_name.lower() in line
return
assert False
def check_connection(node, domain, retries=3):
assert retries >= 0, f"Failed to connect to https://{domain}"
result = node.succeed(
"openssl s_client -brief -verify 2 -CAfile /tmp/ca.crt"
f" -servername {domain} -connect {domain}:443 < /dev/null 2>&1"
)
for line in result.lower().split("\n"):
if "verification" in line and "error" in line:
time.sleep(3)
return check_connection(node, domain, retries - 1)
def check_connection_key_bits(node, domain, bits, retries=3):
assert retries >= 0, f"Did not find expected number of bits ({bits}) in key"
result = node.succeed(
"openssl s_client -CAfile /tmp/ca.crt"
f" -servername {domain} -connect {domain}:443 < /dev/null"
" | openssl x509 -noout -text | grep -i Public-Key"
)
print("Key type:", result)
if bits not in result:
time.sleep(3)
return check_connection_key_bits(node, domain, bits, retries - 1)
def check_stapling(node, domain, retries=3):
assert retries >= 0, "OCSP Stapling check failed"
# Pebble doesn't provide a full OCSP responder, so just check the URL
result = node.succeed(
"openssl s_client -CAfile /tmp/ca.crt"
f" -servername {domain} -connect {domain}:443 < /dev/null"
" | openssl x509 -noout -ocsp_uri"
)
print("OCSP Responder URL:", result)
if "${caDomain}:4002" not in result.lower():
time.sleep(3)
return check_stapling(node, domain, retries - 1)
def download_ca_certs(node, retries=5):
assert retries >= 0, "Failed to connect to pebble to download root CA certs"
exit_code, _ = node.execute("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt")
exit_code_2, _ = node.execute(
"curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt"
)
if exit_code + exit_code_2 > 0:
time.sleep(3)
return download_ca_certs(node, retries - 1)
client.start()
dnsserver.start()
dnsserver.wait_for_unit("pebble-challtestsrv.service")
client.wait_for_unit("default.target")
client.succeed(
'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.config.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a'
)
acme.start()
webserver.start()
acme.wait_for_unit("network-online.target")
acme.wait_for_unit("pebble.service")
download_ca_certs(client)
with subtest("Can request certificate with HTTPS-01 challenge"):
webserver.wait_for_unit("acme-finished-a.example.test.target")
webserver.wait_for_unit("nginx.service")
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service || true")
print(output)
with subtest("Can create many certs at once"):
switch_to(webserver, "many-certs")
webserver.wait_for_unit("acme-finished-b.example.test.target")
webserver.wait_for_unit("acme-finished-c.example.test.target")
webserver.wait_for_unit("nginx.service")
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service")
print(output)
with subtest("many certs b"):
switch_to(webserver, "many-certs-b")
webserver.wait_for_unit("acme-finished-d.example.test.target")
webserver.wait_for_unit("acme-finished-e.example.test.target")
webserver.wait_for_unit("nginx.service")
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service")
print(output)
with subtest("many certs c"):
switch_to(webserver, "many-certs-c")
webserver.wait_for_unit("acme-finished-f.example.test.target")
webserver.wait_for_unit("nginx.service")
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service")
print(output)
'';
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment