Skip to content

Instantly share code, notes, and snippets.

@aconz2
Last active May 18, 2023 12:31
Show Gist options
  • Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
asyncio ssl get peercert
# WARNING: this doesn't work properly
import asyncio
import ssl
import socket
# included logging to try to figure out why this wasn't working...
import logging
logging.basicConfig()
logging.getLogger('asyncio').setLevel(logging.DEBUG)
async def ssl_cert(hostname, port=443):
context = ssl.create_default_context()
sock = socket.socket(socket.AF_INET)
sock.setblocking(False)
conn = context.wrap_socket(sock, server_hostname=hostname)
loop = asyncio.get_event_loop()
await loop.sock_connect(conn, (hostname, port))
return conn.getpeercert()
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
# USE THIS VERSION
import asyncio
async def ssl_cert(hostname, port=443):
reader, _ = await asyncio.open_connection(
host=hostname,
port=port,
ssl=True,
server_hostname=hostname,
)
return reader._transport.get_extra_info('peercert')
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com')))
from cryptography import x509
from cryptography.hazmat.backends import default_backend
async def do_handshake(sslsock, loop=None):
loop = loop or asyncio.get_event_loop()
def handshake():
try:
sslsock.do_handshake()
return True
except ssl.SSLWantReadError:
loop.add_reader(sslsock, handshake)
return False
except ssl.SSLWantWriteError:
loop.add_writer(sslsock, handshake)
return False
try:
while not handshake():
await asyncio.sleep(0.1)
finally:
loop.remove_reader(sslsock)
loop.remove_writer(sslsock)
async def ssl_cert_no_verify(hostname, port=443):
context = ssl._create_unverified_context()
loop = asyncio.get_event_loop()
with socket.socket(socket.AF_INET) as sock:
sock.setblocking(False)
await loop.sock_connect(sock, (hostname, port))
with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock:
await do_handshake(sslsock)
der = sslsock.getpeercert(True)
# NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified)
# but *does* return the blob if we use sslsock.getpeercert(True)
cert = x509.load_der_x509_certificate(der, default_backend())
return cert
@aconz2
Copy link
Author

aconz2 commented May 18, 2023

@TheTechromancer I hadn't found a better way. thanks for sharing your code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment