Last active
November 19, 2023 16:50
-
-
Save JBirdVegas/a10b56bc2293dc46393e54b8125e327d to your computer and use it in GitHub Desktop.
Example of how to perform certificate pinning in python without host or chain validation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import io | |
import json | |
import socket | |
import ssl | |
from base64 import b64encode | |
from json import JSONDecodeError | |
from typing import Any, Dict, Optional | |
from urllib import parse | |
_STR_ENCODING = 'utf-8' | |
_CRLF = '\r\n' | |
_DEBUG = False | |
__all__ = [ | |
'PinnedResponse', | |
'FingerprintValidationError', | |
'HttpError', | |
'HttpStatusError', | |
'RedirectError', | |
'put', | |
'delete', | |
'patch', | |
'post', | |
'get', | |
'pinned_request', | |
'make_auth_header_value' | |
] | |
class PinnedResponse(object): | |
__slots__ = ('_raw_body', '_head', '_headers', 'body', 'protocol', | |
'status', 'reason', 'headers', 'endpoint_url', 'json_body') | |
def __init__(self, raw_body: str, endpoint_url): | |
self._raw_body = raw_body | |
self._head, self.body = self._raw_body.split('\r\n\r\n', 1) | |
self._headers = self._head.split('\r\n') | |
self.protocol, self.status, self.reason = self._headers.pop(0).split(' ', maxsplit=2) | |
self.headers = { | |
line[:line.index(":")]: line[line.index(":") + 1:].strip() | |
for line in self._headers | |
if line and line.strip() | |
} | |
self.endpoint_url = endpoint_url | |
try: | |
self.json_body = json.loads(self.body) | |
except (JSONDecodeError, TypeError): | |
self.json_body = None | |
def __repr__(self): | |
return f"PinnedResponse(raw_body={repr(self._raw_body)}, {repr(self.endpoint_url)})" | |
def __str__(self): | |
return f"""STATUS_LINE: {self.status} {self.reason} | |
HEADERS: | |
{self.headers} | |
BODY: | |
{self.body} | |
""" | |
class HttpError(ValueError): | |
__slots__ = () | |
class HttpStatusError(HttpError): | |
__slots__ = ('response', 'endpoint', 'status_code', 'reason') | |
def __init__(self, response: PinnedResponse): | |
self.response = response | |
self.endpoint = response.endpoint_url | |
self.status_code = response.status | |
self.reason = response.reason | |
def __str__(self): | |
return f"Endpoint ({self.endpoint}) returned an error code:" \ | |
f" {self.status_code}, reason: {self.reason}.\n{self.response}" | |
class RedirectError(HttpError): | |
__slots__ = () | |
class HttpsValidationError(HttpError): | |
__slots__ = () | |
class FingerprintValidationError(HttpsValidationError): | |
__slots__ = ('expected_fingerprint', 'actual_fingerprint', 'domain', 'endpoint') | |
def __init__(self, expected_fingerprint: str, actual_fingerprint: str, domain: str, endpoint: str): | |
self.expected_fingerprint = expected_fingerprint | |
self.actual_fingerprint = actual_fingerprint | |
self.domain = domain | |
self.endpoint = endpoint | |
def make_auth_header_value(user_name: str, password: str) -> str: | |
bytesraw = b64encode( | |
b':'.join((bytes(user_name, 'utf-8'), bytes(password, 'utf-8')))).strip() | |
return f'Basic {bytesraw.decode("utf-8")}' | |
def _format_protocol(method: str, endpoint: str): | |
upper_method = method.upper() | |
return f"{upper_method} {endpoint if endpoint else '/'} HTTP/1.1" | |
def _format_headers(headers: Dict[str, str], content_length, host) -> str: | |
headers = headers or {} | |
headers.update({ | |
'Content-Length': str(content_length), | |
'Host': host, | |
}) | |
combined_headers = _CRLF.join([f"{key}: {value}" for key, value in headers.items() if key and value]) | |
return f"{combined_headers}" | |
def _assert_status(pinned_response: PinnedResponse): | |
code = int(pinned_response.status) | |
if code > 400: | |
raise HttpStatusError(pinned_response) | |
if code > 300: | |
raise RedirectError(pinned_response) | |
def _validate_certificate(tls_socket, expected_fingerprint, host, endpoint): | |
# grab host's certificate | |
cert_bin = tls_socket.getpeercert(True) | |
# get the sha256 hash of the cert | |
fingerprint = hashlib.sha256(cert_bin).hexdigest() | |
# format the hash as we expect, ie AB:CD:ED:... | |
formatted_fingerprint = ':'.join([fingerprint[i:i + 2] for i in range(0, len(fingerprint), 2)]).upper() | |
# validate the certificate fingerprints match; fail if not | |
expected = expected_fingerprint.get(host) | |
if not expected == formatted_fingerprint: | |
raise FingerprintValidationError(expected_fingerprint=expected, | |
actual_fingerprint=formatted_fingerprint, | |
domain=host, | |
endpoint=endpoint) | |
if _DEBUG: | |
print(f"Certificate fingerprint validation matched: {formatted_fingerprint}") | |
def put(expected_fingerprint: Dict[str, str], | |
endpoint_url: str, | |
headers: Optional[Dict[str, Any]] = None, | |
body: Optional[str] = None) -> PinnedResponse: | |
return pinned_request(expected_fingerprint, 'PUT', endpoint_url, headers, body) | |
def delete(expected_fingerprint: Dict[str, str], | |
endpoint_url: str, | |
headers: Optional[Dict[str, Any]] = None, | |
body: Optional[str] = None) -> PinnedResponse: | |
return pinned_request(expected_fingerprint, 'DELETE', endpoint_url, headers, body) | |
def patch(expected_fingerprint: Dict[str, str], | |
endpoint_url: str, | |
headers: Optional[Dict[str, Any]] = None, | |
body: Optional[str] = None) -> PinnedResponse: | |
return pinned_request(expected_fingerprint, 'PATCH', endpoint_url, headers, body) | |
def post(expected_fingerprint: Dict[str, str], | |
endpoint_url: str, | |
headers: Optional[Dict[str, Any]] = None, | |
body: Optional[str] = None) -> PinnedResponse: | |
return pinned_request(expected_fingerprint, 'POST', endpoint_url, headers, body) | |
def get(expected_fingerprint: Dict[str, str], | |
endpoint_url: str, | |
headers: Optional[Dict[str, Any]] = None) -> PinnedResponse: | |
return pinned_request(expected_fingerprint, 'GET', endpoint_url, headers, None) | |
def pinned_request(expected_fingerprints: Dict[str, str], | |
method: str, | |
endpoint_url: str, | |
headers: Dict[str, str], | |
body: Optional[str], | |
follow_redirects: bool = True, | |
redirects_left: int = 10) -> PinnedResponse: | |
_url = parse.urlparse(endpoint_url) | |
_port = _url.port or 443 | |
# noinspection PyProtectedMember | |
_insecure_context = ssl._create_unverified_context() | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as _raw_socket: # type: socket.socket | |
# noinspection PyArgumentList | |
with ssl.SSLSocket(sock=_raw_socket, | |
do_handshake_on_connect=False, | |
cert_reqs=False, | |
_context=_insecure_context) as _ssl_socket: # type: ssl.SSLSocket | |
# connect to remote host | |
_ssl_socket.connect((_url.hostname, _port)) | |
# setup TLS | |
_ssl_socket.do_handshake() | |
# before transferring any data; check the certificate | |
_validate_certificate(tls_socket=_ssl_socket, | |
expected_fingerprint=expected_fingerprints, | |
host=_url.hostname, | |
endpoint=endpoint_url) | |
# if your still here the fingerprint on the cert is valid, lets send the data. | |
# First up send the method and protocol the headers then body | |
_protocol = _format_protocol(method, _url.path) | |
_headers = _format_headers(headers, len(body) if body else 0, f'{_url.hostname}:{_port}') | |
_body = f"""{_protocol}{_CRLF}{_headers}{_CRLF * 2}""" | |
if body: | |
_body += f"{body}" | |
if _DEBUG: | |
print(f"SENDING: {_url.hostname}:{_port}{_url.path}\n{_body}") | |
# send the data as utf-8 encoded bytes | |
_send_response = _ssl_socket.sendall(_body.encode(_STR_ENCODING), socket.SHUT_RD) | |
if _send_response is not None: | |
raise HttpsValidationError( | |
f"Failed to send body to endpoint:" | |
f" {_url.hostname}:{_port}{_url.path if _url.path else '/'}\n\n{_send_response}") | |
_all = [] | |
while True: | |
_chunk = _ssl_socket.recv(io.DEFAULT_BUFFER_SIZE) | |
_all.append(_chunk) | |
if len(_chunk) != io.DEFAULT_BUFFER_SIZE: | |
break | |
_data = b''.join(_all) | |
_response = PinnedResponse(_data.decode(_STR_ENCODING), endpoint_url) | |
try: | |
_assert_status(_response) | |
except RedirectError: | |
redirects_left -= 1 | |
if follow_redirects and redirects_left > 0: | |
location = _response.headers.get('Location') | |
if location: | |
new_location = parse.urlparse(location) | |
if not expected_fingerprints.get(new_location.hostname): | |
raise RedirectError("Unable to pin certificate as the expected fingerprint is missing") | |
if _DEBUG: | |
print(f"Redirecting to: {location}") | |
return pinned_request(expected_fingerprints, method, location, headers, body, | |
redirects_left=redirects_left) | |
return _response | |
######## | |
# Example usage | |
# | |
# >>> from pinned_request import * | |
# >>> pinned_domains = { | |
# 'google.com': '35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63', | |
# 'www.google.com': '30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0', | |
# } | |
# >>> google = get(pinned_domains, 'https://google.com') | |
# >>> google.status | |
# '200' | |
######## | |
# Get sha256 hash of a domain's certificate | |
# $ echo | openssl s_client -connect google.com:443 | openssl x509 -fingerprint -noout -sha256 | |
# depth=1 C = US, O = Google Trust Services, CN = Google Internet Authority G3 | |
# verify error:num=20:unable to get local issuer certificate | |
# verify return:0 | |
# DONE | |
# SHA256 Fingerprint=35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63 | |
# if you want to support redirects then you will need the redirected domain fingerprint as well. | |
# $ echo | openssl s_client -connect www.google.com:443 | openssl x509 -fingerprint -noout -sha256 | |
# depth=1 C = US, O = Google Trust Services, CN = Google Internet Authority G3 | |
# verify error:num=20:unable to get local issuer certificate | |
# verify return:0 | |
# DONE | |
# SHA256 Fingerprint=30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0 | |
if __name__ == '__main__': | |
pinned_domains = { | |
'google.com': '35:71:F0:71:31:B4:32:B5:D2:A5:34:9B:40:F0:EC:12:8C:96:02:4C:C1:CC:EA:F2:6E:26:22:51:8D:B6:01:63', | |
'www.google.com': '30:10:7F:E2:66:DA:43:F5:86:3C:E2:76:9C:08:96:5D:86:56:DB:8B:A3:B7:41:1B:F5:D2:D2:3D:BA:B1:F1:B0', | |
'untrusted-root.badssl.com': 'D0:73:B3:89:43:B3:6B:D9:70:EC:8F:61:B3:A1:AE:A6:6E:58:EF:F1:60:DA:EE:14:3B:CB:9D:99:67:86:78:13', | |
'wrong.host.badssl.com': 'D0:73:B3:89:43:B3:6B:D9:70:EC:8F:61:B3:A1:AE:A6:6E:58:EF:F1:60:DA:EE:14:3B:CB:9D:99:67:86:78:13', | |
'expired.badssl.com': 'Not a valid fingerprint', | |
'apple.com': 'A3:3C:EF:BD:CF:4E:E6:55:6E:74:44:31:B8:F0:EB:45:12:F2:0F:E8:3B:31:C1:A7:55:62:0D:29:2F:6B:E4:6F' | |
} | |
print("Validating a correct certificate") | |
response = get(pinned_domains, 'https://google.com') | |
if _DEBUG: | |
print(response) | |
print("Validating untrusted root but correctly pinned certificate") | |
response = get(pinned_domains, 'https://untrusted-root.badssl.com') | |
if _DEBUG: | |
print(response) | |
print("Validating host name of certificate is ignore but the certificate is still correctly pinned") | |
response = get(pinned_domains, 'https://wrong.host.badssl.com') | |
if _DEBUG: | |
print(response) | |
try: | |
print("Validating an incorrect fingerprint will trigger a failure") | |
response = get(pinned_domains, 'https://expired.badssl.com') | |
if _DEBUG: | |
print(response) | |
raise AssertionError("You should not here... if you are something has gone very wrong") | |
except FingerprintValidationError: | |
print(f"Correctly identified incorrect certificate") | |
try: | |
print("Validating redirected url will fail if missing a certificate from the redirect path") | |
response = get(pinned_domains, 'https://apple.com') | |
raise AssertionError(f"You should not here... if you are something has gone very wrong: {response}") | |
except RedirectError: | |
print("Correctly identified a missing fingerprint from a redirect path") | |
print("Tests passed") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment