Last active
July 5, 2017 02:35
-
-
Save zengxs/d90c6d403365992e855cd15cddee5800 to your computer and use it in GitHub Desktop.
Let's Encrypt Acme Client Protocol Implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import copy | |
import hashlib | |
import json | |
import logging | |
import time | |
from collections import namedtuple | |
import requests | |
from jwt.algorithms import get_default_algorithms | |
from jwt.utils import base64url_encode, base64url_decode | |
from jwt.utils import force_bytes | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import serialization, hashes | |
from cryptography.hazmat.primitives.asymmetric import ec | |
logging.basicConfig(level=logging.DEBUG) | |
LE_SERVER = 'https://acme-staging.api.letsencrypt.org/directory' | |
def to_json(data): | |
return json.dumps(data, sort_keys=True, separators=(',', ':')) | |
class AcmeError(IOError): | |
def __init__(self, response): | |
msg = "The ACME request failed." | |
try: | |
details = response.json() | |
msg = '{} (Type: {}, HTTP {})'.format( | |
details.get('detail'), | |
details.get('type') or 'unknown', response.reason) | |
except (ValueError, TypeError, AttributeError): | |
pass | |
super().__init__(msg) | |
class Account: | |
def __init__(self, jwk, uri=None): | |
self.jwk = jwk | |
self.uri = uri | |
self.jwk_thumbprint = self.generate_jwk_thumbprint() | |
def generate_jwk_thumbprint(self): | |
# sha256sum | |
sha = hashes.Hash(hashes.SHA256(), default_backend()) | |
sha.update(force_bytes(to_json(self.jwk))) | |
return base64url_encode(sha.finalize()).decode('ascii') | |
class Acme: | |
def __init__(self, url, account, priv_key): | |
self.url = url | |
self.accout = account | |
self.jwk = account.jwk | |
self.key = priv_key | |
self._get_nonce() | |
def _get_nonce(self): | |
r = requests.get(self.url) | |
self.nonce = r.headers.get('Replay-Nonce') | |
self.dir = r.json() | |
return self.nonce | |
def _generate_header(self): | |
if self.jwk.get('kty') == 'EC': | |
algorithm = 'ES' + self.jwk.get('crv').strip('P-') | |
elif self.jwk.get('kty') == 'RSA': | |
algorithm = 'RS256' | |
else: | |
raise ValueError('Invalid JWK key type: {}.' % self.jwk.get('kty')) | |
header = {'alg': algorithm, 'jwk': self.jwk} | |
protected = copy.deepcopy(header) | |
protected['nonce'] = self.nonce | |
return header, protected | |
def _sign_request(self, header, protected, payload, algorithm): | |
protected = base64url_encode(force_bytes(to_json(protected))) | |
payload = base64url_encode(force_bytes(to_json(payload))) | |
try: | |
algorithm_obj = get_default_algorithms()[algorithm] | |
signing_input = b'.'.join([protected, payload]) | |
key = algorithm_obj.prepare_key(self.key) | |
signature = algorithm_obj.sign(signing_input, key) | |
except KeyError: | |
raise NotImplementedError('Algorithm not supported') | |
return dict( | |
header=header, | |
protected=protected.decode('ascii'), | |
payload=payload.decode('ascii'), | |
signature=base64url_encode(signature).decode('ascii')) | |
def _post(self, url, payload, headers={}): | |
http_headers = {'Content-Type': 'application/json'} | |
http_headers.update(headers) | |
header, protected = self._generate_header() | |
dat = self._sign_request(header, protected, payload, header.get('alg')) | |
response = requests.post(url, json=dat, headers=http_headers) | |
self.nonce = response.headers.get('Replay-Nonce') | |
return response | |
def get_authorization(self, uri): | |
r = requests.get(uri) | |
try: | |
return r.json() | |
except (ValueError, TypeError, AttributeError) as e: | |
raise AcmeError(e) | |
def new_authorization(self, domain): | |
r = self._post(self.dir.get('new-authz'), { | |
'resource': 'new-authz', | |
'identifier': {'type': 'dns','value': domain} | |
}) | |
if r.status_code == 201: # Created | |
return NewAuthorizationResult(r.json(), r.headers.get('Location')) | |
raise AcmeError(response) | |
def validate_authorization(self, uri, validate_type, key_authorizatioin): | |
response = self._post(uri, { | |
'resource': 'challenge', | |
'type': validate_type, | |
'keyAuthorization': key_authorizatioin, | |
}) | |
if str(response.status_code).startswith('2'): | |
return True | |
raise AcmeError(response) | |
def issue_certificate(self, csr): | |
http_headers = {'Accept': 'application/pkix-cert'} | |
response = self._post(self.dir['new-cert'], { | |
'resource': 'new-cert', | |
'csr': base64url_encode(csr.public_bytes( | |
serialization.Encoding.DER)).decode('ascii'), | |
}, headers=http_headers) | |
if response.status_code == 201: # Created | |
# der to pem | |
cer = x509.load_der_x509_certificate(response.content, \ | |
default_backend()).public_bytes(serialization.Encoding.PEM) | |
inter = response.links.get('up') # 中间证书 | |
if inter : | |
inter = requests.get(inter['url'], headers=http_headers).content | |
chain = x509.load_der_x509_certificate(inter, \ | |
default_backend()).public_bytes(serialization.Encoding.PEM) | |
return IssuanceResult(cer, response.headers.get('Location'), chain) | |
raise AcmeError(response) | |
# namedtuples | |
NewAuthorizationResult = namedtuple('NewAuthorizationResult', 'contents uri') | |
IssuanceResult = namedtuple('IssuanceResult', 'certificate location intermediate') | |
def get_challenge(auth, method): | |
"""获取指定类型的 challenge""" | |
try: | |
return [x for x in auth.contents.get('challenges') or [] \ | |
if x.get('type') == method][0] | |
except IndexError: | |
# raise | |
pass | |
def main(): | |
# test issue | |
with open('account-test.key', 'rb') as key, open('account-test.json') as jf: | |
info = json.loads(jf.read()) | |
pkey = serialization.load_pem_private_key( \ | |
key.read(), None, default_backend()) | |
account = Account(info['contents']['key'], uri=info['uri']) | |
acme = Acme(LE_SERVER, account, pkey) | |
with open('cert-test.csr', 'rb') as csrf: | |
csr = x509.load_pem_x509_csr(csrf.read(), default_backend()) | |
logging.info('Requesting certificate issuance...') | |
result = acme.issue_certificate(csr) | |
logging.info('Certificate issued.') | |
with open('cert-test.crt', 'wb') as cer: | |
cer.write(result.certificate) | |
cer.write(result.intermediate) | |
cer.write(force_bytes(result.location)) | |
logging.info(result.intermediate) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import copy | |
import base64 | |
# requests | |
import requests | |
# PyJWT | |
from jwt.algorithms import get_default_algorithms | |
from jwt.utils import base64url_encode, to_base64url_uint, force_bytes | |
# cryptography | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
# Let's Encrypt Staging Server (for test, not issue valid certificate) | |
LE_SERVER = 'https://acme-staging.api.letsencrypt.org/directory' | |
# generate a new EC key (account key) | |
pkey = ec.generate_private_key(curve=ec.SECP384R1(), backend=default_backend()) | |
# get nonce & acme directory | |
r = requests.get(LE_SERVER) | |
nonce = r.headers['Replay-Nonce'] | |
directory = r.json() | |
header = { | |
# let's encrypt server only support curve 'P-256' and 'P-384' | |
# curve 'P-256' must be signed with 'ES256' | |
# curve 'P-384' must be signed with 'ES384' | |
"alg": "ES384", | |
"jwk": { | |
"kty": "EC", | |
"crv": 'P-384', | |
"x": to_base64url_uint(pkey.public_key().public_numbers().x).decode('ascii'), | |
"y": to_base64url_uint(pkey.public_key().public_numbers().y).decode('ascii'), | |
} | |
} | |
protected = copy.deepcopy(header) | |
protected['nonce'] = nonce | |
payload = {'resource':'new-reg', 'contact':['mailto:[email protected]']} | |
protected = base64url_encode(force_bytes(json.dumps(protected))) | |
payload = base64url_encode(force_bytes(json.dumps(payload))) | |
def sign(input, key, algorithm='ES256'): | |
"""JWS Sign""" | |
try: | |
alg_obj = get_default_algorithms()[algorithm] | |
key = alg_obj.prepare_key(key) | |
signature = alg_obj.sign(input, key) | |
except KeyError: | |
raise NotImplementedError('Algorithm not supported') | |
return signature | |
signing_input = b'.'.join([protected, payload]) | |
signature = base64url_encode(sign(signing_input, pkey, algorithm='ES384')) | |
data = { | |
'header': header, | |
'protected': protected.decode('ascii'), | |
'payload': payload.decode('ascii'), | |
'signature': signature.decode('ascii'), | |
} | |
r = requests.post(directory['new-reg'], json=data) | |
# if response '201 Created', register success | |
print('Status: %d %s' % (r.status_code, r.reason)) | |
print(json.dumps(r.json(), indent=4)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment