Skip to content

Instantly share code, notes, and snippets.

@0xZDH
Created November 3, 2022 18:14
Show Gist options
  • Save 0xZDH/a0b5064bd9078fc28c557ceed6942122 to your computer and use it in GitHub Desktop.
Save 0xZDH/a0b5064bd9078fc28c557ceed6942122 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# certwatch.py - SQL and API adapter Python library for crt.sh
#
# from certwatch import CertWatch
#
#
# usage: certwatch.py [-h] -d DOMAIN [-a {api,psql}] [-e] [--proxy PROXY] [--debug]
#
# certwatch | SQL and API adapter Python library for crt.sh -- v0.1.0
#
# options:
# -h, --help show this help message and exit
# -d DOMAIN, --domain DOMAIN
# domain to search against crt.sh
# -a {api,psql}, --adapter {api,psql}
# crt.sh adapter type
# -e, --expired include expired certificates
# --proxy PROXY http request proxy (e.g. http://127.0.0.1:8080)
# --debug enable debug output
import logging
import requests # type: ignore
from itertools import groupby
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
__version__ = "0.1.0"
class CertWatch:
"""PostgreSQL and API adapter for crt.sh"""
def __uniq_tuple_list(
self,
l: List[Tuple[str, Any]],
) -> List[Tuple[str, Any]]:
"""Unique a list of tuples by the first element
:param l: list of tuples
:returns: uniqued list of tuples
"""
f = lambda x: x[0]
return [next(g) for _, g in groupby(sorted(l, key=f), key=f)]
def __psql(
self,
domain: str,
expired: bool = False,
) -> List[Tuple[str, str]]:
"""Query the crt.sh PostgreSQL database directly for a given
domain and retrieve the X509v3 Subject Alternative Name value(s)
:param domain: domain to query for
:param expired: whether to include expired certificates
:returns: list of (dNSName, notAfter) record tuples
"""
# Only import when being used to allow API access without installing
# external dependencies
try:
# pip install psycopg2-binary
import psycopg2 # type: ignore
except ModuleNotFoundError:
logging.error("'psycopg2' is not installed")
logging.error("Could not connect to database")
return None
conn = None
dns_names = None
# Build SQL query
# Based on the API query:
# ?q=example.com&match=ILIKE&deduplicate=Y&exclude=expired&showSQL=Y
query = (
"SELECT cai.NAME_VALUE, x509_notAfter(cai.CERTIFICATE) NOT_AFTER "
"FROM certificate_and_identities cai "
f"WHERE plainto_tsquery('certwatch', '{domain}') @@ identities(cai.CERTIFICATE) "
f"AND cai.NAME_VALUE ILIKE ('%' || '{domain}' || '%') "
"{EXPIRED}"
"AND NOT EXISTS ("
"SELECT 1 "
"FROM certificate c2 "
"WHERE x509_serialNumber(c2.CERTIFICATE) = x509_serialNumber(cai.CERTIFICATE) "
"AND c2.ISSUER_CA_ID = cai.ISSUER_CA_ID "
"AND c2.ID < cai.CERTIFICATE_ID "
"AND x509_tbscert_strip_ct_ext(c2.CERTIFICATE) = x509_tbscert_strip_ct_ext(cai.CERTIFICATE) "
"LIMIT 1) "
"LIMIT 10000;"
)
# Exclude expired certificates
expired_query = (
(
"AND coalesce(x509_notAfter(cai.CERTIFICATE), 'infinity'::timestamp) >= date_trunc('year', now() AT TIME ZONE 'UTC') "
"AND x509_notAfter(cai.CERTIFICATE) >= now() AT TIME ZONE 'UTC' "
)
if not expired
else ""
)
query = query.format(EXPIRED=expired_query)
try:
logging.debug("Connecting to database crt.sh:5432 via PostgreSQL")
# psql -t -h crt.sh -p 5432 -U guest certwatch
conn = psycopg2.connect(
host="crt.sh",
port=5432,
user="guest",
database="certwatch",
)
# Set `default_transaction_read_only` to on
conn.set_session(readonly=True, autocommit=True)
with conn.cursor() as curs:
# Execute the query
curs.execute(query)
# Grab the list of DNS records via a list
# of single item Tuples
dns_names = curs.fetchall()
if not dns_names:
logging.warn("No records found")
except psycopg2.DatabaseError as e:
logging.error(f"Database Exception: {e}")
dns_names = None # Unset
except Exception as e:
logging.error(f"Exception: {e}")
dns_names = None # Unset
finally:
# Close the connection
if conn is not None:
conn.close()
logging.debug("Database connection closed")
return dns_names
def __api(
self,
domain: str,
expired: bool = False,
proxies: Dict[str, str] = None,
) -> List[Tuple[str, str]]:
"""Query the crt.sh API directly for a given domain and
retrieve the X509v3 Subject Alternative Name value(s)
JSON response structure:
{
issuer_ca_id,
issuer_name,
common_name,
name_value,
id,
entry_timestamp,
not_before,
not_after,
serial_number
}
:param domain: domain to query for
:param expired: whether to include expired certificates
:param proxies: http request proxies
:returns: list of (dNSName, notAfter) record tuples
"""
dns_names = None
# Only verify SSL certificates if not through proxy
verify = False if proxies else True
# Build API request URL
url = f"https://crt.sh/?q={domain}&match=ILIKE&deduplicate=Y&output=json"
# Exclude expired certificates
if not expired:
url += "&exclude=expired"
try:
logging.debug("Connecting to API via HTTPS")
response = requests.get(
url,
proxies=proxies,
verify=verify,
)
# Handle invalid responses
if response.status_code != 200:
logging.error("Invalid API response")
return None
records = response.json()
dns_names = [] # Reset to not None
for record in records:
# SAN value(s) are stored as one value split via `\n`
name_values = record["name_value"].split("\n")
for name_value in name_values:
dns_names.append((name_value, record["not_after"]))
if not dns_names:
logging.warn("No records found")
return dns_names
except Exception as e:
logging.error(f"Exception: {e}")
return None
@classmethod
def query_x509(
cls,
domain: str,
adapter: str = "api",
expired: bool = False,
proxies: Dict[str, str] = None,
) -> List[Any]:
"""Query crt.sh for a given domain and retrieve the
X509v3 Subject Alternative Name value(s)
:param domain: domain to query for
:param adapter: adapter type to query [api, psql]
:param expired: whether to include expired certificates
:param proxies: http request proxies
:returns: list of (dNSName, notAfter) record tuples
"""
# Verify adapter
adapter = adapter.lower()
if adapter not in ["api", "psql"]:
logging.error("Invalid certwatch adapter type")
return None
logging.info(f"Retrieving X509v3 Subject Alternative Name(s) for: '{domain}'")
if adapter == "psql":
# Query the PostgreSQL database directly
dns_names = cls.__psql(
cls,
domain=domain,
expired=expired,
)
elif adapter == "api":
# Query the API
dns_names = cls.__api(
cls,
domain=domain,
expired=expired,
proxies=proxies,
)
else:
# Fall-back catch for invalid adapters
logging.error("Invalid certwatch adapter type")
return None
# Check if failed query run
if dns_names is None:
logging.error("Could not retrieve records")
else:
# If records found, display count
if dns_names:
# Sort and unique records
dns_names = cls.__uniq_tuple_list(cls, dns_names)
logging.info(f"{len(dns_names)} records found")
return dns_names
if __name__ == "__main__":
import time
import argparse
parser = argparse.ArgumentParser(
description=(
f"certwatch | SQL and API adapter Python library for crt.sh -- v{__version__}"
)
)
parser.add_argument(
"-d",
"--domain",
type=str,
required=True,
help="domain to search against crt.sh",
)
parser.add_argument(
"-a",
"--adapter",
type=str,
choices=["api", "psql"],
default="api",
help="crt.sh adapter type",
)
parser.add_argument(
"-e",
"--expired",
action="store_true",
help="include expired certificates",
)
parser.add_argument(
"--proxy",
type=str,
help="http request proxy (e.g. http://127.0.0.1:8080)",
)
parser.add_argument("--debug", action="store_true", help="enable debug output")
args = parser.parse_args()
# Initialize logging
logging_level = logging.DEBUG if args.debug else logging.INFO
logging_format = "[%(asctime)s] %(levelname)-5s | %(message)s"
logging.basicConfig(format=logging_format, level=logging_level)
proxies = None
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
start = time.perf_counter()
dns_names = CertWatch.query_x509(
domain=args.domain,
adapter=args.adapter,
expired=args.expired,
proxies=proxies,
)
if dns_names:
print("\nX509v3 Subject Alternative Name:")
for dns in dns_names:
print(f"\t{dns[1]} | {dns[0]}")
print()
elapsed = time.perf_counter() - start
logging.debug(f"{__name__} executed in {elapsed:0.2f} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment