-
-
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
# WARNING: this doesn't work properly | |
import asyncio | |
import ssl | |
import socket | |
# included logging to try to figure out why this wasn't working... | |
import logging | |
logging.basicConfig() | |
logging.getLogger('asyncio').setLevel(logging.DEBUG) | |
async def ssl_cert(hostname, port=443): | |
context = ssl.create_default_context() | |
sock = socket.socket(socket.AF_INET) | |
sock.setblocking(False) | |
conn = context.wrap_socket(sock, server_hostname=hostname) | |
loop = asyncio.get_event_loop() | |
await loop.sock_connect(conn, (hostname, port)) | |
return conn.getpeercert() | |
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
# USE THIS VERSION | |
import asyncio | |
async def ssl_cert(hostname, port=443): | |
reader, _ = await asyncio.open_connection( | |
host=hostname, | |
port=port, | |
ssl=True, | |
server_hostname=hostname, | |
) | |
return reader._transport.get_extra_info('peercert') | |
print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
async def do_handshake(sslsock, loop=None): | |
loop = loop or asyncio.get_event_loop() | |
def handshake(): | |
try: | |
sslsock.do_handshake() | |
return True | |
except ssl.SSLWantReadError: | |
loop.add_reader(sslsock, handshake) | |
return False | |
except ssl.SSLWantWriteError: | |
loop.add_writer(sslsock, handshake) | |
return False | |
try: | |
while not handshake(): | |
await asyncio.sleep(0.1) | |
finally: | |
loop.remove_reader(sslsock) | |
loop.remove_writer(sslsock) | |
async def ssl_cert_no_verify(hostname, port=443): | |
context = ssl._create_unverified_context() | |
loop = asyncio.get_event_loop() | |
with socket.socket(socket.AF_INET) as sock: | |
sock.setblocking(False) | |
await loop.sock_connect(sock, (hostname, port)) | |
with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock: | |
await do_handshake(sslsock) | |
der = sslsock.getpeercert(True) | |
# NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified) | |
# but *does* return the blob if we use sslsock.getpeercert(True) | |
cert = x509.load_der_x509_certificate(der, default_backend()) | |
return cert |
was finally able to get the noworks.py
method to "work" in works2.py
, though the handshake business is still nasty
note that works2.py
is also purposefully not verifying the cert b/c in the event that a cert is expired, we would get an ssl.SSLError
and never be able to see the cert (would be nice if it was attached to the error object...)
@aconz2 Thanks very much for publishing this, it's helped a lot.
By chance, have you since found a better way to get a cert asynchronously without verification? The works2.py
example still works in 2023, but oof I'd be overjoyed if there was a cleaner way to do it.
EDIT: Victory! After much pain and suffering (and help from chatgpt), here is the final working version:
import ssl
import asyncio
from OpenSSL import crypto
async def get_ssl_cert(host, port):
loop = asyncio.get_event_loop()
# Create a SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Connect to the host
transport, proto = await loop.create_connection(
lambda: asyncio.Protocol(),
host, port, ssl=ssl_context
)
# Get the SSL object
ssl_object = transport.get_extra_info('ssl_object')
# Get the certificate
cert = ssl_object.getpeercert(binary_form=True)
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
return x509
host = 'www.example.com'
port = 443
loop = asyncio.get_event_loop()
cert = loop.run_until_complete(get_ssl_cert(host, port))
print('Issuer:', cert.get_issuer())
print('Subject:', cert.get_subject())
@TheTechromancer I hadn't found a better way. thanks for sharing your code
noworks.py
is something that I thought should work, but doesn't. theloop.sock_connect
seems to not make an actual connection so theconn._sslobj
isNone
. It does work when the socket is blocking (though that defeats the whole point of course)The second is slightly "wasteful" since you don't need a reader or writer stream, but it works; so...