-
-
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
| # WARNING: this doesn't work properly | |
| import asyncio | |
| import ssl | |
| import socket | |
| # included logging to try to figure out why this wasn't working... | |
| import logging | |
| logging.basicConfig() | |
| logging.getLogger('asyncio').setLevel(logging.DEBUG) | |
| async def ssl_cert(hostname, port=443): | |
| context = ssl.create_default_context() | |
| sock = socket.socket(socket.AF_INET) | |
| sock.setblocking(False) | |
| conn = context.wrap_socket(sock, server_hostname=hostname) | |
| loop = asyncio.get_event_loop() | |
| await loop.sock_connect(conn, (hostname, port)) | |
| return conn.getpeercert() | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
| # USE THIS VERSION | |
| import asyncio | |
| async def ssl_cert(hostname, port=443): | |
| reader, _ = await asyncio.open_connection( | |
| host=hostname, | |
| port=port, | |
| ssl=True, | |
| server_hostname=hostname, | |
| ) | |
| return reader._transport.get_extra_info('peercert') | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
| from cryptography import x509 | |
| from cryptography.hazmat.backends import default_backend | |
| async def do_handshake(sslsock, loop=None): | |
| loop = loop or asyncio.get_event_loop() | |
| def handshake(): | |
| try: | |
| sslsock.do_handshake() | |
| return True | |
| except ssl.SSLWantReadError: | |
| loop.add_reader(sslsock, handshake) | |
| return False | |
| except ssl.SSLWantWriteError: | |
| loop.add_writer(sslsock, handshake) | |
| return False | |
| try: | |
| while not handshake(): | |
| await asyncio.sleep(0.1) | |
| finally: | |
| loop.remove_reader(sslsock) | |
| loop.remove_writer(sslsock) | |
| async def ssl_cert_no_verify(hostname, port=443): | |
| context = ssl._create_unverified_context() | |
| loop = asyncio.get_event_loop() | |
| with socket.socket(socket.AF_INET) as sock: | |
| sock.setblocking(False) | |
| await loop.sock_connect(sock, (hostname, port)) | |
| with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock: | |
| await do_handshake(sslsock) | |
| der = sslsock.getpeercert(True) | |
| # NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified) | |
| # but *does* return the blob if we use sslsock.getpeercert(True) | |
| cert = x509.load_der_x509_certificate(der, default_backend()) | |
| return cert |
was finally able to get the noworks.py method to "work" in works2.py, though the handshake business is still nasty
note that works2.py is also purposefully not verifying the cert b/c in the event that a cert is expired, we would get an ssl.SSLError and never be able to see the cert (would be nice if it was attached to the error object...)
@aconz2 Thanks very much for publishing this, it's helped a lot.
By chance, have you since found a better way to get a cert asynchronously without verification? The works2.py example still works in 2023, but oof I'd be overjoyed if there was a cleaner way to do it.
EDIT: Victory! After much pain and suffering (and help from chatgpt), here is the final working version:
import ssl
import asyncio
from OpenSSL import crypto
async def get_ssl_cert(host, port):
loop = asyncio.get_event_loop()
# Create a SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# Connect to the host
transport, proto = await loop.create_connection(
lambda: asyncio.Protocol(),
host, port, ssl=ssl_context
)
# Get the SSL object
ssl_object = transport.get_extra_info('ssl_object')
# Get the certificate
cert = ssl_object.getpeercert(binary_form=True)
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
return x509
host = 'www.example.com'
port = 443
loop = asyncio.get_event_loop()
cert = loop.run_until_complete(get_ssl_cert(host, port))
print('Issuer:', cert.get_issuer())
print('Subject:', cert.get_subject())@TheTechromancer I hadn't found a better way. thanks for sharing your code
noworks.pyis something that I thought should work, but doesn't. theloop.sock_connectseems to not make an actual connection so theconn._sslobjisNone. It does work when the socket is blocking (though that defeats the whole point of course)The second is slightly "wasteful" since you don't need a reader or writer stream, but it works; so...