Created
November 3, 2022 18:14
-
-
Save 0xZDH/a0b5064bd9078fc28c557ceed6942122 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# certwatch.py - SQL and API adapter Python library for crt.sh | |
# | |
# from certwatch import CertWatch | |
# | |
# | |
# usage: certwatch.py [-h] -d DOMAIN [-a {api,psql}] [-e] [--proxy PROXY] [--debug] | |
# | |
# certwatch | SQL and API adapter Python library for crt.sh -- v0.1.0 | |
# | |
# options: | |
# -h, --help show this help message and exit | |
# -d DOMAIN, --domain DOMAIN | |
# domain to search against crt.sh | |
# -a {api,psql}, --adapter {api,psql} | |
# crt.sh adapter type | |
# -e, --expired include expired certificates | |
# --proxy PROXY http request proxy (e.g. http://127.0.0.1:8080) | |
# --debug enable debug output | |
import logging | |
import requests # type: ignore | |
from itertools import groupby | |
from typing import Any | |
from typing import Dict | |
from typing import List | |
from typing import Tuple | |
__version__ = "0.1.0" | |
class CertWatch: | |
"""PostgreSQL and API adapter for crt.sh""" | |
def __uniq_tuple_list( | |
self, | |
l: List[Tuple[str, Any]], | |
) -> List[Tuple[str, Any]]: | |
"""Unique a list of tuples by the first element | |
:param l: list of tuples | |
:returns: uniqued list of tuples | |
""" | |
f = lambda x: x[0] | |
return [next(g) for _, g in groupby(sorted(l, key=f), key=f)] | |
def __psql( | |
self, | |
domain: str, | |
expired: bool = False, | |
) -> List[Tuple[str, str]]: | |
"""Query the crt.sh PostgreSQL database directly for a given | |
domain and retrieve the X509v3 Subject Alternative Name value(s) | |
:param domain: domain to query for | |
:param expired: whether to include expired certificates | |
:returns: list of (dNSName, notAfter) record tuples | |
""" | |
# Only import when being used to allow API access without installing | |
# external dependencies | |
try: | |
# pip install psycopg2-binary | |
import psycopg2 # type: ignore | |
except ModuleNotFoundError: | |
logging.error("'psycopg2' is not installed") | |
logging.error("Could not connect to database") | |
return None | |
conn = None | |
dns_names = None | |
# Build SQL query | |
# Based on the API query: | |
# ?q=example.com&match=ILIKE&deduplicate=Y&exclude=expired&showSQL=Y | |
query = ( | |
"SELECT cai.NAME_VALUE, x509_notAfter(cai.CERTIFICATE) NOT_AFTER " | |
"FROM certificate_and_identities cai " | |
f"WHERE plainto_tsquery('certwatch', '{domain}') @@ identities(cai.CERTIFICATE) " | |
f"AND cai.NAME_VALUE ILIKE ('%' || '{domain}' || '%') " | |
"{EXPIRED}" | |
"AND NOT EXISTS (" | |
"SELECT 1 " | |
"FROM certificate c2 " | |
"WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE) " | |
"AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID " | |
"AND c2.ID < cai.CERTIFICATE_ID " | |
"AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE) " | |
"LIMIT 1) " | |
"LIMIT 10000;" | |
) | |
# Exclude expired certificates | |
expired_query = ( | |
( | |
"AND coalesce(x509_notAfter(cai.CERTIFICATE), 'infinity'::timestamp) >= date_trunc('year', now() AT TIME ZONE 'UTC') " | |
"AND x509_notAfter(cai.CERTIFICATE) >= now() AT TIME ZONE 'UTC' " | |
) | |
if not expired | |
else "" | |
) | |
query = query.format(EXPIRED=expired_query) | |
try: | |
logging.debug("Connecting to database crt.sh:5432 via PostgreSQL") | |
# psql -t -h crt.sh -p 5432 -U guest certwatch | |
conn = psycopg2.connect( | |
host="crt.sh", | |
port=5432, | |
user="guest", | |
database="certwatch", | |
) | |
# Set `default_transaction_read_only` to on | |
conn.set_session(readonly=True, autocommit=True) | |
with conn.cursor() as curs: | |
# Execute the query | |
curs.execute(query) | |
# Grab the list of DNS records via a list | |
# of single item Tuples | |
dns_names = curs.fetchall() | |
if not dns_names: | |
logging.warn("No records found") | |
except psycopg2.DatabaseError as e: | |
logging.error(f"Database Exception: {e}") | |
dns_names = None # Unset | |
except Exception as e: | |
logging.error(f"Exception: {e}") | |
dns_names = None # Unset | |
finally: | |
# Close the connection | |
if conn is not None: | |
conn.close() | |
logging.debug("Database connection closed") | |
return dns_names | |
def __api( | |
self, | |
domain: str, | |
expired: bool = False, | |
proxies: Dict[str, str] = None, | |
) -> List[Tuple[str, str]]: | |
"""Query the crt.sh API directly for a given domain and | |
retrieve the X509v3 Subject Alternative Name value(s) | |
JSON response structure: | |
{ | |
issuer_ca_id, | |
issuer_name, | |
common_name, | |
name_value, | |
id, | |
entry_timestamp, | |
not_before, | |
not_after, | |
serial_number | |
} | |
:param domain: domain to query for | |
:param expired: whether to include expired certificates | |
:param proxies: http request proxies | |
:returns: list of (dNSName, notAfter) record tuples | |
""" | |
dns_names = None | |
# Only verify SSL certificates if not through proxy | |
verify = False if proxies else True | |
# Build API request URL | |
url = f"https://crt.sh/?q={domain}&match=ILIKE&deduplicate=Y&output=json" | |
# Exclude expired certificates | |
if not expired: | |
url += "&exclude=expired" | |
try: | |
logging.debug("Connecting to API via HTTPS") | |
response = requests.get( | |
url, | |
proxies=proxies, | |
verify=verify, | |
) | |
# Handle invalid responses | |
if response.status_code != 200: | |
logging.error("Invalid API response") | |
return None | |
records = response.json() | |
dns_names = [] # Reset to not None | |
for record in records: | |
# SAN value(s) are stored as one value split via `\n` | |
name_values = record["name_value"].split("\n") | |
for name_value in name_values: | |
dns_names.append((name_value, record["not_after"])) | |
if not dns_names: | |
logging.warn("No records found") | |
return dns_names | |
except Exception as e: | |
logging.error(f"Exception: {e}") | |
return None | |
@classmethod | |
def query_x509( | |
cls, | |
domain: str, | |
adapter: str = "api", | |
expired: bool = False, | |
proxies: Dict[str, str] = None, | |
) -> List[Any]: | |
"""Query crt.sh for a given domain and retrieve the | |
X509v3 Subject Alternative Name value(s) | |
:param domain: domain to query for | |
:param adapter: adapter type to query [api, psql] | |
:param expired: whether to include expired certificates | |
:param proxies: http request proxies | |
:returns: list of (dNSName, notAfter) record tuples | |
""" | |
# Verify adapter | |
adapter = adapter.lower() | |
if adapter not in ["api", "psql"]: | |
logging.error("Invalid certwatch adapter type") | |
return None | |
logging.info(f"Retrieving X509v3 Subject Alternative Name(s) for: '{domain}'") | |
if adapter == "psql": | |
# Query the PostgreSQL database directly | |
dns_names = cls.__psql( | |
cls, | |
domain=domain, | |
expired=expired, | |
) | |
elif adapter == "api": | |
# Query the API | |
dns_names = cls.__api( | |
cls, | |
domain=domain, | |
expired=expired, | |
proxies=proxies, | |
) | |
else: | |
# Fall-back catch for invalid adapters | |
logging.error("Invalid certwatch adapter type") | |
return None | |
# Check if failed query run | |
if dns_names is None: | |
logging.error("Could not retrieve records") | |
else: | |
# If records found, display count | |
if dns_names: | |
# Sort and unique records | |
dns_names = cls.__uniq_tuple_list(cls, dns_names) | |
logging.info(f"{len(dns_names)} records found") | |
return dns_names | |
if __name__ == "__main__": | |
import time | |
import argparse | |
parser = argparse.ArgumentParser( | |
description=( | |
f"certwatch | SQL and API adapter Python library for crt.sh -- v{__version__}" | |
) | |
) | |
parser.add_argument( | |
"-d", | |
"--domain", | |
type=str, | |
required=True, | |
help="domain to search against crt.sh", | |
) | |
parser.add_argument( | |
"-a", | |
"--adapter", | |
type=str, | |
choices=["api", "psql"], | |
default="api", | |
help="crt.sh adapter type", | |
) | |
parser.add_argument( | |
"-e", | |
"--expired", | |
action="store_true", | |
help="include expired certificates", | |
) | |
parser.add_argument( | |
"--proxy", | |
type=str, | |
help="http request proxy (e.g. http://127.0.0.1:8080)", | |
) | |
parser.add_argument("--debug", action="store_true", help="enable debug output") | |
args = parser.parse_args() | |
# Initialize logging | |
logging_level = logging.DEBUG if args.debug else logging.INFO | |
logging_format = "[%(asctime)s] %(levelname)-5s | %(message)s" | |
logging.basicConfig(format=logging_format, level=logging_level) | |
proxies = None | |
if args.proxy: | |
proxies = {"http": args.proxy, "https": args.proxy} | |
start = time.perf_counter() | |
dns_names = CertWatch.query_x509( | |
domain=args.domain, | |
adapter=args.adapter, | |
expired=args.expired, | |
proxies=proxies, | |
) | |
if dns_names: | |
print("\nX509v3 Subject Alternative Name:") | |
for dns in dns_names: | |
print(f"\t{dns[1]} | {dns[0]}") | |
print() | |
elapsed = time.perf_counter() - start | |
logging.debug(f"{__name__} executed in {elapsed:0.2f} seconds") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment