Last active
May 18, 2023 12:31
-
-
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
asyncio ssl get peercert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # WARNING: this doesn't work properly | |
| import asyncio | |
| import ssl | |
| import socket | |
| # included logging to try to figure out why this wasn't working... | |
| import logging | |
| logging.basicConfig() | |
| logging.getLogger('asyncio').setLevel(logging.DEBUG) | |
| async def ssl_cert(hostname, port=443): | |
| context = ssl.create_default_context() | |
| sock = socket.socket(socket.AF_INET) | |
| sock.setblocking(False) | |
| conn = context.wrap_socket(sock, server_hostname=hostname) | |
| loop = asyncio.get_event_loop() | |
| await loop.sock_connect(conn, (hostname, port)) | |
| return conn.getpeercert() | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # USE THIS VERSION | |
| import asyncio | |
| async def ssl_cert(hostname, port=443): | |
| reader, _ = await asyncio.open_connection( | |
| host=hostname, | |
| port=port, | |
| ssl=True, | |
| server_hostname=hostname, | |
| ) | |
| return reader._transport.get_extra_info('peercert') | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from cryptography import x509 | |
| from cryptography.hazmat.backends import default_backend | |
| async def do_handshake(sslsock, loop=None): | |
| loop = loop or asyncio.get_event_loop() | |
| def handshake(): | |
| try: | |
| sslsock.do_handshake() | |
| return True | |
| except ssl.SSLWantReadError: | |
| loop.add_reader(sslsock, handshake) | |
| return False | |
| except ssl.SSLWantWriteError: | |
| loop.add_writer(sslsock, handshake) | |
| return False | |
| try: | |
| while not handshake(): | |
| await asyncio.sleep(0.1) | |
| finally: | |
| loop.remove_reader(sslsock) | |
| loop.remove_writer(sslsock) | |
| async def ssl_cert_no_verify(hostname, port=443): | |
| context = ssl._create_unverified_context() | |
| loop = asyncio.get_event_loop() | |
| with socket.socket(socket.AF_INET) as sock: | |
| sock.setblocking(False) | |
| await loop.sock_connect(sock, (hostname, port)) | |
| with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock: | |
| await do_handshake(sslsock) | |
| der = sslsock.getpeercert(True) | |
| # NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified) | |
| # but *does* return the blob if we use sslsock.getpeercert(True) | |
| cert = x509.load_der_x509_certificate(der, default_backend()) | |
| return cert |
@aconz2 Thanks very much for publishing this, it's helped a lot.
By chance, have you since found a better way to get a cert asynchronously without verification? The works2.py example still works in 2023, but oof I'd be overjoyed if there was a cleaner way to do it.
EDIT: Victory! After much pain and suffering (and help from chatgpt), here is the final working version:
import ssl
import asyncio
from OpenSSL import crypto
async def get_ssl_cert(host, port):
loop = asyncio.get_event_loop()
# Create a SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Connect to the host
transport, proto = await loop.create_connection(
lambda: asyncio.Protocol(),
host, port, ssl=ssl_context
)
# Get the SSL object
ssl_object = transport.get_extra_info('ssl_object')
# Get the certificate
cert = ssl_object.getpeercert(binary_form=True)
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
return x509
host = 'www.example.com'
port = 443
loop = asyncio.get_event_loop()
cert = loop.run_until_complete(get_ssl_cert(host, port))
print('Issuer:', cert.get_issuer())
print('Subject:', cert.get_subject())@TheTechromancer I hadn't found a better way. thanks for sharing your code
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
was finally able to get the
noworks.pymethod to "work" inworks2.py, though the handshake business is still nastynote that
works2.pyis also purposefully not verifying the cert b/c in the event that a cert is expired, we would get anssl.SSLErrorand never be able to see the cert (would be nice if it was attached to the error object...)