-
Star
(136)
You must be signed in to star a gist -
Fork
(24)
You must be signed in to fork a gist
-
-
Save erikbern/756b1d8df2d1487497d29b90e81f8068 to your computer and use it in GitHub Desktop.
import contextlib | |
import OpenSSL.crypto | |
import os | |
import requests | |
import ssl | |
import tempfile | |
@contextlib.contextmanager | |
def pfx_to_pem(pfx_path, pfx_password): | |
''' Decrypts the .pfx file to be used with requests. ''' | |
with tempfile.NamedTemporaryFile(suffix='.pem') as t_pem: | |
f_pem = open(t_pem.name, 'wb') | |
pfx = open(pfx_path, 'rb').read() | |
p12 = OpenSSL.crypto.load_pkcs12(pfx, pfx_password) | |
f_pem.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, p12.get_privatekey())) | |
f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, p12.get_certificate())) | |
ca = p12.get_ca_certificates() | |
if ca is not None: | |
for cert in ca: | |
f_pem.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)) | |
f_pem.close() | |
yield t_pem.name | |
# HOW TO USE: | |
# with pfx_to_pem('foo.pem', 'bar') as cert: | |
# requests.post(url, cert=cert, data=payload) |
I am getting error something like this - "OpenSSL.SSL.Error: [('memory buffer routines', 'BUF_MEM_grow_clean', 'malloc failure'), ('SSL routines', 'read_state_machine', 'BUF lib')]", can you please help? This error occurs every 1 week, more or less.
Thanks.
Thanks a lot
Worked fine just about 5 months ago and now when I try to run my program I am getting an error 'certificate verify failed'. Can you please help?? Any suggestions?
Certificates are up to date((
VOCÊ É UM ARROMBADO!!! Valeuuuuuuuu!
With this technique, for me anyway, I'm able to decrypt a .pfx file without error, but then it fails authentication when submitting to the server per the example, even though the same .pfx file with same password submits fine via Postman. Don't know why it won't work, but I used these instructions to "decompose" the .pfx into a pair of .pem files, and those can be submitted directly from python requests. And with that technique, you only have to do it once, not on every request.
It works on *nix machine but on Windows, it is giving error
[WinError 32] The process cannot access the file because it is being used by another process: 'C:\\path\to\\\tmpbnb1dy2v.pem'
For anyone having "Permission Denied" error, here is the fix (delete=False):
with tempfile.NamedTemporaryFile(suffix='.pem', dir=mainDir, delete=False) as t_pem:
with tempfile.NamedTemporaryFile(suffix='.pem') as t_pem:
t_pem.close() # better solution on Windows than delete=False
f_pem = open(t_pem.name, 'wb')
A variant using cryptography (to avoid DeprecationWarning: PKCS#12 support in pyOpenSSL is deprecated):
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)
with NamedTemporaryFile(suffix='.pem') as t_pem:
with open(t_pem.name, 'wb') as f_pem:
pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
pem_file.write(main_cert.public_bytes(Encoding.PEM))
for ca in add_certs:
pem_file.write(ca.public_bytes(Encoding.PEM))
yield t_pem.name
# HOW TO USE:
# with pfx_to_pem('foo.pem', 'bar') as cert:
# requests.post(url, cert=cert, data=payload)
I am find mistake:
wrong:
with open(t_pem.name, 'wb') as f_pem:
right:
with open(t_pem.name, 'wb') as pem_file:
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)
with NamedTemporaryFile(suffix='.pem') as t_pem:
with open(t_pem.name, 'wb') as pem_file:
pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
pem_file.write(main_cert.public_bytes(Encoding.PEM))
for ca in add_certs:
pem_file.write(ca.public_bytes(Encoding.PEM))
yield t_pem.name
# HOW TO USE:
# with pfx_to_pem('foo.pem', 'bar') as cert:
# requests.post(url, cert=cert, data=payload)
Thank you. This helped me.
For me still getting the 401 Error.
I tried to connect to API using browser its working fine. I am not sure what i am missing. I am using cryptography
Thank you, saved a lot of time!
i recommend putting a license statement at the top. if you choose not to put a license statement then read: https://choosealicense.com/no-permission/ . thanks. cheers.
could someone help me with a full example, in my case, I am reading the pfx file from Azure key vault, so I dont have a way to specify the path of the pfx file, unless, I create a temp file,
so I am looking for an example which is something like, converting pfx file which is in memory(read from upstream) and then convert in to certificate(X-509) and a private key to pass along with a request,
thanks, for the help
If you have an in-memory bytestring, then you just don't need to read them from a file (what Path(pfx_path).read_bytes()
is doing), as load_key_and_certificates
expects bytes in its first argument.
with open(t_pem.name, 'wb') as f_pem:
Hi I was talking about with open(t_pem.name, 'wb') as f_pem: this line
And I was referring to my version (see comment of May 21) which doesn't use deprecated code.
And I was referring to my version (see comment of May 21) which doesn't use deprecated code.
@claudep thanks for responding, this is how I am retrieving pfx cert, could you help me with the flow for the next steps, once I have the certificate. how to use this certificate for making API request,
certificate_client = CertificateClient(vault_url=KVUri, credential=credential)
certificate = certificate_client.get_certificate("CERT_KEYVAULT_IN_PFX")
Sorry, I don't know this CertificateClient
stuff. Once you have the in-memory bytes, you should be able to feed them to load_key_and_certificates
. I'm afraid at this stage you will have to test/debug things yourself.
@mahesh3359 what you are looking for is to read your certificate from Azure Key Vault.
If you read this issue, you will notice that your certificate private key is actually stored in Key Vault Secrets (not Key Vault Certificates).
The name of the secret is the same as your certificate name. So for you, the code will be the following:
secrets_client = SecretClient(vault_url=vault_url, credential=credential)
certificate = secrets_client.get_secret(certificate_name)
Once you do that, then you will have the .pfx
file stored in the certificate.value
variable (encoded in base64). If you want to use that with this solution, then the code would be the following:
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from base64 import b64decode
import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
@contextmanager
def pfx_to_pem(pfx_content, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
pfx = b64decode(pfx_content)
private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)
with NamedTemporaryFile(suffix='.pem') as t_pem:
with open(t_pem.name, 'wb') as pem_file:
pem_file.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
pem_file.write(main_cert.public_bytes(Encoding.PEM))
for ca in add_certs:
pem_file.write(ca.public_bytes(Encoding.PEM))
yield t_pem.name
# HOW TO USE:
# secrets_client = SecretClient(vault_url=vault_url, credential=credential)
# certificate = secrets_client.get_secret(certificate_name)
# with pfx_to_pem(certificate.value, 'bar') as cert:
# requests.post(url, cert=cert, data=payload)
license
I to recommend to state a license for this code.
The following code worked for me in Databricks as of 21 Feb 2024
from contextlib import contextmanager
from pathlib import Path
from tempfile import NamedTemporaryFile
import requests
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
@contextmanager
def pfx_to_pem(pfx_path, pfx_password):
''' Decrypts the .pfx file to be used with requests. '''
pfx = Path(pfx_path).read_bytes()
private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None)
with NamedTemporaryFile(suffix='.pem') as t_pem:
with open(t_pem.name, 'wb') as f_pem:
f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()))
f_pem.write(main_cert.public_bytes(Encoding.PEM))
for ca in add_certs:
f_pem.write(ca.public_bytes(Encoding.PEM))
yield t_pem.name
# HOW TO USE:
#with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert:
# resp = requests.get(url, cert=cert, headers=headers)
The following code worked for me in Databricks as of 21 Feb 2024
from contextlib import contextmanager from pathlib import Path from tempfile import NamedTemporaryFile import requests from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates @contextmanager def pfx_to_pem(pfx_path, pfx_password): ''' Decrypts the .pfx file to be used with requests. ''' pfx = Path(pfx_path).read_bytes() private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None) with NamedTemporaryFile(suffix='.pem') as t_pem: with open(t_pem.name, 'wb') as f_pem: f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())) f_pem.write(main_cert.public_bytes(Encoding.PEM)) for ca in add_certs: f_pem.write(ca.public_bytes(Encoding.PEM)) yield t_pem.name # HOW TO USE: #with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert: # resp = requests.get(url, cert=cert, headers=headers)
I got Permission denied from Temp folder. All credentials are correct. What is the possible issue?
The following code worked for me in Databricks as of 21 Feb 2024
from contextlib import contextmanager from pathlib import Path from tempfile import NamedTemporaryFile import requests from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates @contextmanager def pfx_to_pem(pfx_path, pfx_password): ''' Decrypts the .pfx file to be used with requests. ''' pfx = Path(pfx_path).read_bytes() private_key, main_cert, add_certs = load_key_and_certificates(pfx, pfx_password.encode('utf-8'), None) with NamedTemporaryFile(suffix='.pem') as t_pem: with open(t_pem.name, 'wb') as f_pem: f_pem.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())) f_pem.write(main_cert.public_bytes(Encoding.PEM)) for ca in add_certs: f_pem.write(ca.public_bytes(Encoding.PEM)) yield t_pem.name # HOW TO USE: #with pfx_to_pem('/dbfs/FileStore/xxx/xxx.pfx', 'pfx_file_password') as cert: # resp = requests.get(url, cert=cert, headers=headers)
I got Permission denied from Temp folder. All credentials are correct. What is the possible issue?
need more details, what system are you using. local or cloud, are you also using databricks?
based on your code i tried using the scoped method from azure keyvault however i am getting error message
ImportError: cannot import name 'load_pkcs12' from 'OpenSSL.crypto' (unknown location)
ImportError Traceback (most recent call last)
File , line 5
1 import sys
2 import requests
----> 5 from OpenSSL.crypto import FILETYPE_PEM, dump_privatekey, load_pkcs12
6 from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
7 from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat
ImportError: cannot import name 'load_pkcs12' from 'OpenSSL.crypto' (unknown location)
so the question is will it work or do i need to upload my .pfx file onto dbfs?
@learnprofile are you using databricks? Have you uploaded the certificate in the azure key vault and reading it like below
cert = dbutils.secrets.get(scope = "", key = "<key_name>")
HI Shafique, yes i am using it in Azure Databricks and then i have uploaded certificate in my Azure keyvault certificate store.
and then here is my code
import sys
import requests
from OpenSSL.crypto import FILETYPE_PEM, dump_privatekey, load_pkcs12
from cryptography.hazmat.primitives.serialization.pkcs12 import load_key_and_certificates
from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat
import msal
pfx_path = dbutils.secrets.get("SPN-Certificate-Scope", "1649-new")
pfx_password = dbutils.secrets.get("SPN-Certificate-Scope", "1649-pfx-secret")
certificate_thumbprint = "xxxxxxxxxxx"
app_id = "xxxxxxxxxxxxxx"
tenant_id = "xxxxxxxxxxx"
def __init__(
self,
tenant_id: str,
spn_app_id: str,
certificate_thumbprint: str,
pfx_password: str = None,
pfx_bytes: bytes = None):
"""Instantiate an AzureSPNWithCertificate class.
:param tenant_id: Azure tenant id
:param spn_app_id: SPN application (client) id
:param certificate_thumbprint: SPN's certificate thumbprint
:param pfx_password: password used for the certificate
:param pfx_bytes: certificate file as bytes. If None, you must call function read_pfx_file to read it.
"""
self.tenant_id = tenant_id
self.spn_app_id = spn_app_id
self.pfx_bytes = pfx_bytes
self.pfx_password = pfx_password
self.certificate_thumbprint = certificate_thumbprint
self.private_key_bytes = None
def read_pfx_file(self, pfx_path: str):
"""Read certificate and store it as bytes parameter 'pfx_bytes'.
:param pfx_path: path to the certificate file (pfx file)
"""
with open(pfx_path, "rb") as f_pfx:
self.pfx_bytes = f_pfx.read()
# see https://stackoverflow.com/questions/6345786/python-reading-a-pkcs12-certificate-with-pyopenssl-crypto
def _set_private_key_from_certificate(self) -> bytes:
"""Retrieve the private key from the certificate and store it as bytes parameter 'private_key_bytes'."""
if not self.pfx_bytes:
raise Exception(f"Parameter 'pfx_bytes' is missing.")
private_key, _, _ = load_key_and_certificates(self.pfx_bytes, pfx_password.encode())
self.private_key_bytes = private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption(),
)
am i missing anything?
I am getting permission denier error upon executing this piece of code
but when i keep delete=False , pem is getting created and not getting deleted, upon executing with this i get SSL Error