Last active
May 18, 2023 12:31
-
-
Save aconz2/ed23256e05518a21512f20c4c1fdfa3f to your computer and use it in GitHub Desktop.
asyncio ssl get peercert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # WARNING: this doesn't work properly | |
| import asyncio | |
| import ssl | |
| import socket | |
| # included logging to try to figure out why this wasn't working... | |
| import logging | |
| logging.basicConfig() | |
| logging.getLogger('asyncio').setLevel(logging.DEBUG) | |
| async def ssl_cert(hostname, port=443): | |
| context = ssl.create_default_context() | |
| sock = socket.socket(socket.AF_INET) | |
| sock.setblocking(False) | |
| conn = context.wrap_socket(sock, server_hostname=hostname) | |
| loop = asyncio.get_event_loop() | |
| await loop.sock_connect(conn, (hostname, port)) | |
| return conn.getpeercert() | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # USE THIS VERSION | |
| import asyncio | |
| async def ssl_cert(hostname, port=443): | |
| reader, _ = await asyncio.open_connection( | |
| host=hostname, | |
| port=port, | |
| ssl=True, | |
| server_hostname=hostname, | |
| ) | |
| return reader._transport.get_extra_info('peercert') | |
| print(asyncio.get_event_loop().run_until_complete(ssl_cert('google.com'))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from cryptography import x509 | |
| from cryptography.hazmat.backends import default_backend | |
| async def do_handshake(sslsock, loop=None): | |
| loop = loop or asyncio.get_event_loop() | |
| def handshake(): | |
| try: | |
| sslsock.do_handshake() | |
| return True | |
| except ssl.SSLWantReadError: | |
| loop.add_reader(sslsock, handshake) | |
| return False | |
| except ssl.SSLWantWriteError: | |
| loop.add_writer(sslsock, handshake) | |
| return False | |
| try: | |
| while not handshake(): | |
| await asyncio.sleep(0.1) | |
| finally: | |
| loop.remove_reader(sslsock) | |
| loop.remove_writer(sslsock) | |
| async def ssl_cert_no_verify(hostname, port=443): | |
| context = ssl._create_unverified_context() | |
| loop = asyncio.get_event_loop() | |
| with socket.socket(socket.AF_INET) as sock: | |
| sock.setblocking(False) | |
| await loop.sock_connect(sock, (hostname, port)) | |
| with context.wrap_socket(sock, server_hostname=hostname, do_handshake_on_connect=False) as sslsock: | |
| await do_handshake(sslsock) | |
| der = sslsock.getpeercert(True) | |
| # NOTE: sslsock.getpeercert(False) returns an empty dictionary when used with CERT_NONE (ie. unverified) | |
| # but *does* return the blob if we use sslsock.getpeercert(True) | |
| cert = x509.load_der_x509_certificate(der, default_backend()) | |
| return cert |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@TheTechromancer I hadn't found a better way. thanks for sharing your code