Skip to content

Instantly share code, notes, and snippets.

@aconz2
Last active May 18, 2023 12:31
Show Gist options
  • Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
asyncio ssl get peercert
# WARNING: this doesn't work properly
import asyncio
import ssl
import socket
# included logging to try to figure out why this wasn't working...
import logging
logging.basicConfig()
logging.getLogger('asyncio').setLevel(logging.DEBUG)
async def ssl_cert(hostname, port=443):
context = ssl.create_default_context()
sock = socket.socket(socket.AF_INET)
sock.setblocking(False)
conn = context.wrap_socket(sock, server_hostname=hostname)
loop = asyncio.get_event_loop()
await loop.sock_connect(conn, (hostname, port))
return conn.getpeercert()
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
# USE THIS VERSION
import asyncio
async def ssl_cert(hostname, port=443):
reader, _ = await asyncio.open_connection(
host=hostname,
port=port,
ssl=True,
server_hostname=hostname,
)
return reader._transport.get_extra_info('peercert')
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
from cryptography import x509
from cryptography.hazmat.backends import default_backend
async def do_handshake(sslsock, loop=None):
loop = loop or asyncio.get_event_loop()
def handshake():
try:
sslsock.do_handshake()
return True
except ssl.SSLWantReadError:
loop.add_reader(sslsock, handshake)
return False
except ssl.SSLWantWriteError:
loop.add_writer(sslsock, handshake)
return False
try:
while not handshake():
await asyncio.sleep(0.1)
finally:
loop.remove_reader(sslsock)
loop.remove_writer(sslsock)
async def ssl_cert_no_verify(hostname, port=443):
context = ssl._create_unverified_context()
loop = asyncio.get_event_loop()
with socket.socket(socket.AF_INET) as sock:
sock.setblocking(False)
await loop.sock_connect(sock, (hostname, port))
with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock:
await do_handshake(sslsock)
der = sslsock.getpeercert(True)
# NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified)
# but *does* return the blob if we use sslsock.getpeercert(True)
cert = x509.load_der_x509_certificate(der, default_backend())
return cert
@aconz2
Copy link
Author

aconz2 commented Sep 25, 2017

noworks.py is something that I thought should work, but doesn't. the loop.sock_connect seems to not make an actual connection so the conn._sslobj is None. It does work when the socket is blocking (though that defeats the whole point of course)

The second is slightly "wasteful" since you don't need a reader or writer stream, but it works; so...

@aconz2
Copy link
Author

aconz2 commented Sep 28, 2017

was finally able to get the noworks.py method to "work" in works2.py, though the handshake business is still nasty

note that works2.py is also purposefully not verifying the cert b/c in the event that a cert is expired, we would get an ssl.SSLError and never be able to see the cert (would be nice if it was attached to the error object...)

@TheTechromancer
Copy link

TheTechromancer commented May 17, 2023

@aconz2 Thanks very much for publishing this, it's helped a lot.

By chance, have you since found a better way to get a cert asynchronously without verification? The works2.py example still works in 2023, but oof I'd be overjoyed if there was a cleaner way to do it.

EDIT: Victory! After much pain and suffering (and help from chatgpt), here is the final working version:

import ssl
import asyncio
from OpenSSL import crypto

async def get_ssl_cert(host, port):
    loop = asyncio.get_event_loop()

    # Create a SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Connect to the host
    transport, proto = await loop.create_connection(
        lambda: asyncio.Protocol(), 
        host, port, ssl=ssl_context
    )

    # Get the SSL object
    ssl_object = transport.get_extra_info('ssl_object')

    # Get the certificate
    cert = ssl_object.getpeercert(binary_form=True)
    x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
    return x509

host = 'www.example.com'
port = 443

loop = asyncio.get_event_loop()
cert = loop.run_until_complete(get_ssl_cert(host, port))
print('Issuer:', cert.get_issuer())
print('Subject:', cert.get_subject())

@aconz2
Copy link
Author

aconz2 commented May 18, 2023

@TheTechromancer I hadn't found a better way. thanks for sharing your code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment