Skip to content

Instantly share code, notes, and snippets.

@malaya-zemlya
Created November 15, 2024 18:17
Show Gist options
  • Save malaya-zemlya/92d02059c1970bd41a0c2ffa830c95f5 to your computer and use it in GitHub Desktop.
Save malaya-zemlya/92d02059c1970bd41a0c2ffa830c95f5 to your computer and use it in GitHub Desktop.
Convert a certificate in Google CA Service into CSR
import argparse
from pathlib import Path
from typing import Union
import crcmod
import hashlib
import asn1crypto.pem
import asn1crypto.x509
import asn1crypto.csr
import asn1crypto.keys
import asn1crypto.algos
import asn1crypto.core
from google.cloud import kms
from google.cloud.security import privateca_v1
def pem_to_der(s: Union[str, bytes]) -> bytes:
"""Decode PEM format to DER bytes."""
b = s.encode() if isinstance(s, str) else s
_, _, der_bytes = asn1crypto.pem.unarmor(b)
return der_bytes
def crc32c(data: bytes) -> int:
"""Calculates the CRC32C checksum of the provided data."""
return crcmod.predefined.mkPredefinedCrcFun("crc-32c")(data)
def parse_args():
"""
Parse command line arguments for CSR creation.
"""
parser = argparse.ArgumentParser(
description='Create a Certificate Signing Request (CSR) based on a cert in GoogleCA Service',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Example usage:
%(prog)s --cn example.com --org "Example Corp" --country US \\
--pubkey public.der --out request.csr \\
--email [email protected] --ou "IT Department" \\
--state "California" --locality "San Francisco"
'''
)
parser.add_argument('--project-id',
required=False,
type=str,
default='lemur-stage-001',
help='GCP Project ID that contains the key')
parser.add_argument('--location-id',
required=False,
type=str,
default='us-central1',
help='Location ID of the key ring')
parser.add_argument('--ca-pool-id',
required=False,
type=str,
default='root-pool',
help='CA Pool name')
parser.add_argument('--ca-id',
required=False,
type=str,
default='root-ca',
help='CA Pool name')
return parser.parse_args()
def get_public_key(key_name: str) -> kms.PublicKey:
"""
Get the public key for an asymmetric key from Google Cloud KMS.
:param key_name: name of the Google KMS Key
:return: the corresponding public key
"""
# Create the client.
client = kms.KeyManagementServiceClient()
# Call the API.
public_key = client.get_public_key(request={"name": key_name})
# Optional, but recommended: perform integrity verification on public_key.
# For more details on ensuring E2E in-transit integrity to and from Cloud KMS visit:
# https://cloud.google.com/kms/docs/data-integrity-guidelines
if not public_key.name == key_name:
raise Exception("The request sent to the server was corrupted in-transit.")
# See crc32c() function defined below.
if not public_key.pem_crc32c == crc32c(public_key.pem.encode("utf-8")):
raise Exception("The response received from the server was corrupted in-transit.")
# End integrity verification
return public_key
def sign_blob(blob: bytes, hash_algorithm_id: str, key_name: str) -> bytes:
"""
Uses Google KMS to sign a blob of bytes using an assymetric key in Google KMS
:param blob: bytes to sign
:param hash_algorithm_id: the algorithm to
:param key_name:
:return: signature for the blob, equal to assymetric.sign(hash(blob))
"""
client = kms.KeyManagementServiceClient()
h = hash_blob(blob, hash_algorithm_id)
sign_response = client.asymmetric_sign(
request={
"name": key_name,
"digest": {hash_algorithm_id: h},
"digest_crc32c": crc32c(h),
}
)
if not sign_response.verified_digest_crc32c or sign_response.name != key_name:
raise Exception("The request sent to the server was corrupted in-transit.")
if not sign_response.signature_crc32c == crc32c(sign_response.signature):
raise Exception("The response received from the server was corrupted in-transit.")
return sign_response.signature
def download_certificate(project_id: str, location_id, ca_pool_id: str, ca_id: str):
"""
Downloads a certificate from Google Cloud Certificate Authority Service.
:param project_id: the project ID
:param location_id: the location ID
:param ca_pool_id: the CA Pool ID
:param ca_id: the CA ID
:return: the certificate
"""
# Create the client.
client = privateca_v1.CertificateAuthorityServiceClient()
ca_name = client.certificate_authority_path(
project=project_id,
location=location_id,
ca_pool=ca_pool_id,
certificate_authority=ca_id
)
request = privateca_v1.GetCertificateAuthorityRequest(name=ca_name)
response = client.get_certificate_authority(request=request)
return response
def hash_blob(blob: bytes, hash_algorithm_id: str) -> bytes:
"""
Hashes a blob of bytes using the specified hash algorithm.
:param blob: bytes to hash
:param hash_algorithm_id: the algorithm to use
:return: hash of the blob
"""
h = hashlib.new(hash_algorithm_id)
h.update(blob)
return h.digest()
def main(
project_id,
location_id,
ca_pool_id,
ca_id):
base_cert = download_certificate(project_id=project_id, location_id=location_id, ca_pool_id=ca_pool_id, ca_id=ca_id)
crypto_key_path = base_cert.key_spec.cloud_kms_key_version
key_pem1 = get_public_key(crypto_key_path)
cert_der = pem_to_der(base_cert.pem_ca_certificates[0])
cert_asn1 = asn1crypto.x509.Certificate.load(cert_der)
csr_info = asn1crypto.csr.CertificationRequestInfo({
'version': 0,
'subject': cert_asn1['tbs_certificate']['subject'],
'subject_pk_info': asn1crypto.keys.PublicKeyInfo.load(pem_to_der(key_pem1.pem)),
'attributes': asn1crypto.csr.CRIAttributes([
asn1crypto.csr.CRIAttribute({
'type': 'extension_request',
'values': [asn1crypto.x509.Extensions(cert_asn1['tbs_certificate']['extensions'])]
})]),
})
csr = asn1crypto.csr.CertificationRequest({
'certification_request_info': csr_info,
'signature_algorithm': asn1crypto.algos.SignedDigestAlgorithm({
'algorithm': 'sha384_ecdsa',
'parameters': None
}),
'signature': sign_blob(csr_info.dump(), key_name=crypto_key_path, hash_algorithm_id='sha384')
})
csr_pem = asn1crypto.pem.armor('CERTIFICATE REQUEST', csr.dump(), None).decode('utf-8')
print(csr_pem)
if __name__ == '__main__':
args = parse_args()
main(**vars(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment