Last active
September 19, 2023 09:11
-
-
Save mildsunrise/adb4068650484d9fe354a3ee4238eed3 to your computer and use it in GitHub Desktop.
π Open source implementation of FNMT's certificate configurator v1.0.1 (https://twitter.com/mild_sunrise/status/1585611873860440067)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
Open source implementation of FNMT's certificate configurator v1.0.1 | |
<https://www.sede.fnmt.gob.es/descargas/descarga-software/instalacion-software-generacion-de-claves> | |
No warranty provided; use this ONLY if you know what you're doing. | |
Usage: ./fnmt_handle.py <fnmtcr URL> | |
Fulfills the request indicated by the URL, sending request to answer operation as completed if there are no errors. | |
For the fnmtcr://request phase, the generated private key is written to "privkey.pem" in current directory. | |
For the fnmtcr://install phase, the received PKCS#7 / X.509 blob is written to "cert.der" in current directory. | |
After both 'request' and 'install' are done, you can build a PKCS#12 store from them: | |
$ openssl pkcs7 -print_certs -inform der -in cert.der -out certs.cer | |
$ openssl pkcs12 -export -in certs.cer -inkey privkey.pem -out certificate.p12 | |
''' | |
import sys | |
import gzip | |
import OpenSSL.crypto | |
from cryptography.hazmat.primitives import serialization, ciphers, asymmetric | |
from urllib.parse import urlparse, parse_qs, urlencode, unquote | |
from urllib.request import urlopen, Request | |
from base64 import b64decode, urlsafe_b64encode, urlsafe_b64decode | |
from xml.etree import ElementTree | |
# MAIN CODE | |
def __main__(): | |
args = sys.argv[1:] | |
if len(args) != 1: | |
print(__doc__.strip(), file=sys.stderr) | |
exit(2) | |
url, = args | |
url = urlparse(url) | |
assert url.scheme == 'fnmtcr', f'Unexpected URL schema: {repr(url.scheme)}' | |
handlers = { 'request': do_request, 'install': do_install } | |
assert url.netloc in handlers, f'Unimplemented operation {repr(url.netloc)}, try with official configurator' | |
handler = handlers[url.netloc] | |
assert url.path == '', f'Unexpected URL path: {repr(url.path)}' | |
params = ensure_unique(parse_qs(url.query)) | |
if 'rtservlet' in params: | |
retrieve_parameters(params) | |
common_params = {} | |
for k in { 'rtservlet', 'stservlet', 'key', 'fileid' }: | |
if k in params: | |
common_params[k] = params.pop(k) | |
answer_data = handler(params) | |
send_data(answer_data, common_params) | |
# HANDLERS | |
def do_request(params: dict[str, str]) -> bytes: | |
csrtype = params.pop('csrtype', 'spkac').lower() | |
keytype = params.pop('keytype', 'rsa').lower() | |
keylength = int(params.pop('keylength', '2048')) | |
forcecard = params.pop('forcecard', None) | |
assert keytype == 'rsa' and csrtype in { 'spkac', 'mozilla', 'firefox' } and forcecard != 'true' and not params, \ | |
'Unimplemented "request" parameters, try with official configurator' | |
key = asymmetric.rsa.generate_private_key(65537, keylength) | |
with open('privkey.pem', 'xb') as f: | |
f.write(key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption())) | |
# cryptography doesn't implement SPKAC. PyOpenSSL does, but you can't set the challenge. the openssl tool does, but only allows specifying digest on v3+. sigh. | |
dangerous_hacky_set_spki_challenge = lambda spki, challenge: \ | |
OpenSSL.crypto._lib.ASN1_STRING_set(OpenSSL.crypto._ffi.cast('ASN1_IA5STRING ***', spki._spki)[0][1], challenge.encode(), -1) | |
spkac = OpenSSL.crypto.NetscapeSPKI() | |
dangerous_hacky_set_spki_challenge(spkac, '16074851') | |
ossl_key = OpenSSL.crypto.PKey.from_cryptography_key(key) | |
spkac.set_pubkey(ossl_key) | |
spkac.sign(ossl_key, 'sha256') | |
return b64decode(spkac.b64_encode()) | |
def do_install(params: dict[str, str]) -> bytes: | |
assert 'cert' in params, '"cert" parameter not present, I do not know what to do' | |
cert = params.pop('cert') | |
if params: | |
print(f'warning: Unknown parameters: {params}') | |
cert = urlsafe_b64decode(cert) | |
try: | |
cert = gzip.decompress(cert) | |
except gzip.BadGzipFile: | |
print('note: Invalid GZIP, assuming not compressed.') | |
with open('cert.der', 'xb') as f: | |
f.write(cert) | |
return b'OK' | |
# SERVER COMMUNICATION | |
def retrieve_parameters(params: dict[str, str]): | |
retrieval_servlet_url = params['rtservlet'] | |
print('requesting additional parameters to:', repr(retrieval_servlet_url)) | |
body = { 'op': 'get', 'v': '1', 'id': params['fileid'] } | |
with urlopen(Request(validate_url(retrieval_servlet_url + '?' + urlencode(body)), method='GET')) as resp: | |
body = resp.read().decode() | |
assert not body.upper().startswith('ERR'), f'error response from server: {body}' | |
body = decipher_data(body, params['key'].encode()).decode() if params.get('key') else body | |
for node in ElementTree.fromstring(body): | |
assert node.tag == 'e' and set(node.attrib) == {'k', 'v'} and not list(node), f'unexpected node {node}' | |
params[node.attrib['k']] = unquote(node.attrib['v']) | |
def send_data(data: bytes, params: dict[str, str]): | |
storage_servlet_url = params['stservlet'] | |
print('submitting completion request to:', repr(storage_servlet_url)) | |
data = cipher_data(data, params.get('key', '').encode()) | |
body = { 'op': 'put', 'v': '1_0', 'id': params['fileid'], 'dat': data } | |
with urlopen(Request(validate_url(storage_servlet_url), urlencode(body).encode('ascii'), method='POST')) as resp: | |
body = resp.read() | |
print(f'server response: status={resp.code}, body={body}') | |
assert body.strip() == b'OK', 'unexpected response from server' | |
CIPHER_ALG = ciphers.algorithms.TripleDES | |
def cipher_data(data: bytes, key: bytes) -> str: | |
if not key: | |
return urlsafe_b64encode(data).decode('ascii') | |
assert len(key) == 8, 'invalid key length' | |
padding_len = (-len(data)) % (CIPHER_ALG.block_size // 8) | |
data += b'\0' * padding_len | |
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).encryptor() | |
data = cipher.update(data) + cipher.finalize() | |
return f'{padding_len}.' + urlsafe_b64encode(data).decode('ascii') | |
def decipher_data(data: str, key: bytes) -> bytes: | |
if not key: | |
return urlsafe_b64decode(data) | |
assert len(key) == 8, 'invalid key length' | |
if not ( (idx := data.find('.')) != -1 and (padding_len := data[:idx]).isdigit() and (padding_len := int(padding_len)) < 8 ): | |
raise AssertionError('invalid encrypted data') | |
data = urlsafe_b64decode(data[idx + 1:]) | |
cipher = ciphers.Cipher(CIPHER_ALG(key), ciphers.modes.ECB()).decryptor() | |
data = cipher.update(data) + cipher.finalize() | |
return data[:len(data) - padding_len] | |
# OTHER | |
def ensure_unique(params: dict[str, list[str]]) -> dict[str, str]: | |
new_params = {} | |
for k, v in params.items(): | |
assert len(v) == 1, f'Unexpected duplicate parameter {repr(k)}' | |
new_params[k] = v[0] | |
return new_params | |
def validate_url(url: str) -> str: | |
parsed = urlparse(url) | |
allowed_suffixes = [ '.fnmt.es', '.fnmt.gob.es' ] | |
if not (parsed.scheme.lower() == 'https' and any(parsed.hostname.lower().endswith(suffix) for suffix in allowed_suffixes)): | |
answer = input(f'allow request to {repr(parsed.hostname)}? [y/n]: ') | |
assert answer == 'y', 'user rejected request' | |
return url | |
if __name__ == '__main__': __main__() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment