Created
November 26, 2021 19:21
-
-
Save m1cr0man/4f096c63acf2954d10b54519a6ba2f0f to your computer and use it in GitHub Desktop.
NixOS acme selfsigned CA service failure test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let | |
commonConfig = ./common/acme/client; | |
dnsServerIP = nodes: nodes.dnsserver.config.networking.primaryIPAddress; | |
dnsScript = {pkgs, nodes}: let | |
dnsAddress = dnsServerIP nodes; | |
in pkgs.writeShellScript "dns-hook.sh" '' | |
set -euo pipefail | |
echo '[INFO]' "[$2]" 'dns-hook.sh' $* | |
if [ "$1" = "present" ]; then | |
${pkgs.curl}/bin/curl --data '{"host": "'"$2"'", "value": "'"$3"'"}' http://${dnsAddress}:8055/set-txt | |
else | |
${pkgs.curl}/bin/curl --data '{"host": "'"$2"'"}' http://${dnsAddress}:8055/clear-txt | |
fi | |
''; | |
documentRoot = pkgs: pkgs.runCommand "docroot" {} '' | |
mkdir -p "$out" | |
echo hello world > "$out/index.html" | |
''; | |
vhostBase = pkgs: { | |
forceSSL = true; | |
locations."/".root = documentRoot pkgs; | |
}; | |
in import ./make-test-python.nix ({ lib, ... }: { | |
name = "acme"; | |
meta.maintainers = lib.teams.acme.members; | |
nodes = { | |
# The fake ACME server which will respond to client requests | |
acme = { nodes, lib, ... }: { | |
imports = [ ./common/acme/server ]; | |
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ]; | |
}; | |
# A fake DNS server which can be configured with records as desired | |
# Used to test DNS-01 challenge | |
dnsserver = { nodes, pkgs, ... }: { | |
networking.firewall.allowedTCPPorts = [ 8055 53 ]; | |
networking.firewall.allowedUDPPorts = [ 53 ]; | |
systemd.services.pebble-challtestsrv = { | |
enable = true; | |
description = "Pebble ACME challenge test server"; | |
wantedBy = [ "network.target" ]; | |
serviceConfig = { | |
ExecStart = "${pkgs.pebble}/bin/pebble-challtestsrv -dns01 ':53' -defaultIPv6 '' -defaultIPv4 '${nodes.webserver.config.networking.primaryIPAddress}'"; | |
# Required to bind on privileged ports. | |
AmbientCapabilities = [ "CAP_NET_BIND_SERVICE" ]; | |
}; | |
}; | |
}; | |
# A web server which will be the node requesting certs | |
webserver = { pkgs, nodes, lib, config, ... }: { | |
imports = [ commonConfig ]; | |
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ]; | |
networking.firewall.allowedTCPPorts = [ 80 443 ]; | |
# OpenSSL will be used for more thorough certificate validation | |
environment.systemPackages = [ pkgs.openssl ]; | |
# Set log level to info so that we can see when the service is reloaded | |
services.nginx.enable = true; | |
services.nginx.logError = "stderr info"; | |
# First tests configure a basic cert and run a bunch of openssl checks | |
services.nginx.virtualHosts."a.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.httpd.enable = true; | |
services.httpd.adminAddr = "[email protected]"; | |
services.httpd.virtualHosts."z.example.test" = { | |
enableACME = true; | |
forceSSL = true; | |
}; | |
specialisation.many-certs.configuration = { nodes, pkgs, lib, ... }: { | |
services.nginx.virtualHosts."b.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.nginx.virtualHosts."c.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
}; | |
specialisation.many-certs-b.configuration = { nodes, pkgs, lib, ... }: { | |
services.nginx.virtualHosts."d.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.nginx.virtualHosts."e.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
}; | |
specialisation.many-certs-c.configuration = { nodes, pkgs, lib, ... }: { | |
services.nginx.virtualHosts."f.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.nginx.virtualHosts."g.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.nginx.virtualHosts."h.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
services.nginx.virtualHosts."i.example.test" = (vhostBase pkgs) // { | |
enableACME = true; | |
}; | |
}; | |
}; | |
# The client will be used to curl the webserver to validate configuration | |
client = {nodes, lib, pkgs, ...}: { | |
imports = [ commonConfig ]; | |
networking.nameservers = lib.mkForce [ (dnsServerIP nodes) ]; | |
# OpenSSL will be used for more thorough certificate validation | |
environment.systemPackages = [ pkgs.openssl ]; | |
}; | |
}; | |
testScript = {nodes, ...}: | |
let | |
caDomain = nodes.acme.config.test-support.acme.caDomain; | |
newServerSystem = nodes.webserver.config.system.build.toplevel; | |
switchToNewServer = "${newServerSystem}/bin/switch-to-configuration test"; | |
in | |
# Note, wait_for_unit does not work for oneshot services that do not have RemainAfterExit=true, | |
# this is because a oneshot goes from inactive => activating => inactive, and never | |
# reaches the active state. Targets do not have this issue. | |
'' | |
import time | |
has_switched = False | |
def switch_to(node, name): | |
global has_switched | |
if has_switched: | |
node.succeed( | |
"${switchToNewServer}" | |
) | |
has_switched = True | |
node.succeed( | |
f"/run/current-system/specialisation/{name}/bin/switch-to-configuration test" | |
) | |
# Ensures the issuer of our cert matches the chain | |
# and matches the issuer we expect it to be. | |
# It's a good validation to ensure the cert.pem and fullchain.pem | |
# are not still selfsigned afer verification | |
def check_issuer(node, cert_name, issuer): | |
for fname in ("cert.pem", "fullchain.pem"): | |
actual_issuer = node.succeed( | |
f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}" | |
).partition("=")[2] | |
print(f"{fname} issuer: {actual_issuer}") | |
assert issuer.lower() in actual_issuer.lower() | |
# Ensure cert comes before chain in fullchain.pem | |
def check_fullchain(node, cert_name): | |
subject_data = node.succeed( | |
f"openssl crl2pkcs7 -nocrl -certfile /var/lib/acme/{cert_name}/fullchain.pem" | |
" | openssl pkcs7 -print_certs -noout" | |
) | |
for line in subject_data.lower().split("\n"): | |
if "subject" in line: | |
print(f"First subject in fullchain.pem: {line}") | |
assert cert_name.lower() in line | |
return | |
assert False | |
def check_connection(node, domain, retries=3): | |
assert retries >= 0, f"Failed to connect to https://{domain}" | |
result = node.succeed( | |
"openssl s_client -brief -verify 2 -CAfile /tmp/ca.crt" | |
f" -servername {domain} -connect {domain}:443 < /dev/null 2>&1" | |
) | |
for line in result.lower().split("\n"): | |
if "verification" in line and "error" in line: | |
time.sleep(3) | |
return check_connection(node, domain, retries - 1) | |
def check_connection_key_bits(node, domain, bits, retries=3): | |
assert retries >= 0, f"Did not find expected number of bits ({bits}) in key" | |
result = node.succeed( | |
"openssl s_client -CAfile /tmp/ca.crt" | |
f" -servername {domain} -connect {domain}:443 < /dev/null" | |
" | openssl x509 -noout -text | grep -i Public-Key" | |
) | |
print("Key type:", result) | |
if bits not in result: | |
time.sleep(3) | |
return check_connection_key_bits(node, domain, bits, retries - 1) | |
def check_stapling(node, domain, retries=3): | |
assert retries >= 0, "OCSP Stapling check failed" | |
# Pebble doesn't provide a full OCSP responder, so just check the URL | |
result = node.succeed( | |
"openssl s_client -CAfile /tmp/ca.crt" | |
f" -servername {domain} -connect {domain}:443 < /dev/null" | |
" | openssl x509 -noout -ocsp_uri" | |
) | |
print("OCSP Responder URL:", result) | |
if "${caDomain}:4002" not in result.lower(): | |
time.sleep(3) | |
return check_stapling(node, domain, retries - 1) | |
def download_ca_certs(node, retries=5): | |
assert retries >= 0, "Failed to connect to pebble to download root CA certs" | |
exit_code, _ = node.execute("curl https://${caDomain}:15000/roots/0 > /tmp/ca.crt") | |
exit_code_2, _ = node.execute( | |
"curl https://${caDomain}:15000/intermediate-keys/0 >> /tmp/ca.crt" | |
) | |
if exit_code + exit_code_2 > 0: | |
time.sleep(3) | |
return download_ca_certs(node, retries - 1) | |
client.start() | |
dnsserver.start() | |
dnsserver.wait_for_unit("pebble-challtestsrv.service") | |
client.wait_for_unit("default.target") | |
client.succeed( | |
'curl --data \'{"host": "${caDomain}", "addresses": ["${nodes.acme.config.networking.primaryIPAddress}"]}\' http://${dnsServerIP nodes}:8055/add-a' | |
) | |
acme.start() | |
webserver.start() | |
acme.wait_for_unit("network-online.target") | |
acme.wait_for_unit("pebble.service") | |
download_ca_certs(client) | |
with subtest("Can request certificate with HTTPS-01 challenge"): | |
webserver.wait_for_unit("acme-finished-a.example.test.target") | |
webserver.wait_for_unit("nginx.service") | |
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service || true") | |
print(output) | |
with subtest("Can create many certs at once"): | |
switch_to(webserver, "many-certs") | |
webserver.wait_for_unit("acme-finished-b.example.test.target") | |
webserver.wait_for_unit("acme-finished-c.example.test.target") | |
webserver.wait_for_unit("nginx.service") | |
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service") | |
print(output) | |
with subtest("many certs b"): | |
switch_to(webserver, "many-certs-b") | |
webserver.wait_for_unit("acme-finished-d.example.test.target") | |
webserver.wait_for_unit("acme-finished-e.example.test.target") | |
webserver.wait_for_unit("nginx.service") | |
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service") | |
print(output) | |
with subtest("many certs c"): | |
switch_to(webserver, "many-certs-c") | |
webserver.wait_for_unit("acme-finished-f.example.test.target") | |
webserver.wait_for_unit("nginx.service") | |
_, output = webserver.execute("systemctl status acme-selfsigned-ca.service") | |
print(output) | |
''; | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment