Skip to content

Instantly share code, notes, and snippets.

@aconz2
Last active May 18, 2023 12:31
Show Gist options
  • Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
asyncio ssl get peercert
# WARNING: this doesn't work properly
import asyncio
import ssl
import socket
# included logging to try to figure out why this wasn't working...
import logging
logging.basicConfig()
logging.getLogger('asyncio').setLevel(logging.DEBUG)
async def ssl_cert(hostname, port=443):
context = ssl.create_default_context()
sock = socket.socket(socket.AF_INET)
sock.setblocking(False)
conn = context.wrap_socket(sock, server_hostname=hostname)
loop = asyncio.get_event_loop()
await loop.sock_connect(conn, (hostname, port))
return conn.getpeercert()
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
# USE THIS VERSION
import asyncio
async def ssl_cert(hostname, port=443):
reader, _ = await asyncio.open_connection(
host=hostname,
port=port,
ssl=True,
server_hostname=hostname,
)
return reader._transport.get_extra_info('peercert')
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
from cryptography import x509
from cryptography.hazmat.backends import default_backend
async def do_handshake(sslsock, loop=None):
loop = loop or asyncio.get_event_loop()
def handshake():
try:
sslsock.do_handshake()
return True
except ssl.SSLWantReadError:
loop.add_reader(sslsock, handshake)
return False
except ssl.SSLWantWriteError:
loop.add_writer(sslsock, handshake)
return False
try:
while not handshake():
await asyncio.sleep(0.1)
finally:
loop.remove_reader(sslsock)
loop.remove_writer(sslsock)
async def ssl_cert_no_verify(hostname, port=443):
context = ssl._create_unverified_context()
loop = asyncio.get_event_loop()
with socket.socket(socket.AF_INET) as sock:
sock.setblocking(False)
await loop.sock_connect(sock, (hostname, port))
with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock:
await do_handshake(sslsock)
der = sslsock.getpeercert(True)
# NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified)
# but *does* return the blob if we use sslsock.getpeercert(True)
cert = x509.load_der_x509_certificate(der, default_backend())
return cert
@TheTechromancer
Copy link

TheTechromancer commented May 17, 2023

@aconz2 Thanks very much for publishing this, it's helped a lot.

By chance, have you since found a better way to get a cert asynchronously without verification? The works2.py example still works in 2023, but oof I'd be overjoyed if there was a cleaner way to do it.

EDIT: Victory! After much pain and suffering (and help from chatgpt), here is the final working version:

import ssl
import asyncio
from OpenSSL import crypto

async def get_ssl_cert(host, port):
    loop = asyncio.get_event_loop()

    # Create a SSL context
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE

    # Connect to the host
    transport, proto = await loop.create_connection(
        lambda: asyncio.Protocol(), 
        host, port, ssl=ssl_context
    )

    # Get the SSL object
    ssl_object = transport.get_extra_info('ssl_object')

    # Get the certificate
    cert = ssl_object.getpeercert(binary_form=True)
    x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
    return x509

host = 'www.example.com'
port = 443

loop = asyncio.get_event_loop()
cert = loop.run_until_complete(get_ssl_cert(host, port))
print('Issuer:', cert.get_issuer())
print('Subject:', cert.get_subject())

@aconz2
Copy link
Author

aconz2 commented May 18, 2023

@TheTechromancer I hadn't found a better way. thanks for sharing your code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment