Skip to content

Instantly share code, notes, and snippets.

@AhmedCoolProjects
Created May 16, 2026 15:18
Show Gist options
  • Select an option

  • Save AhmedCoolProjects/d3b372bd89ff35963fce9b3a26ad427b to your computer and use it in GitHub Desktop.

Select an option

Save AhmedCoolProjects/d3b372bd89ff35963fce9b3a26ad427b to your computer and use it in GitHub Desktop.
Domain metadata collector (RDAP/DNS/TLS/IP/CT) — optimized for Google Colab with concurrent module execution
#!/usr/bin/env python3
"""
collect_domains_metadata.py
===========================
Single-file domain metadata collector — optimized for Google Colab.
Key optimisations over the original repo:
• 5 modules (RDAP/DNS/TLS/IP/CT) run *concurrently* per domain via an inner
ThreadPoolExecutor → latency ≈ max(module_times) instead of sum.
• Thread-local requests.Session and dns.Resolver — no per-call construction.
• IANA RDAP bootstrap pre-warmed in the main thread before workers spawn.
• JSONL checkpoint with auto-resume (re-runs only failed / not-yet-done rows).
• Buffered JSONL writes to reduce filesystem pressure.
Usage
-----
# 1. Install deps (run once in Colab):
# !pip install -q dnspython ipwhois cryptography requests pandas pyarrow tqdm
# 2. Run:
python collect_domains_metadata.py \\
--input dev.csv \\
--output dev_meta.csv \\
--workers 50 \\
[--ct] [--dns 8.8.8.8 1.1.1.1]
Output columns: input, class, label, _error, _elapsed_s, + 192 feature columns
matching output_example_after_changes.json exactly.
"""
# ── auto-install (uncomment for Colab) ───────────────────────────────────────
# import subprocess
# subprocess.run(["pip", "install", "-q",
# "dnspython", "ipwhois", "cryptography",
# "requests", "pandas", "pyarrow", "tqdm"], check=True)
import argparse
import ipaddress
import json
import os
import socket
import ssl
import statistics
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from urllib.parse import quote, urlparse
import dns.exception
import dns.resolver
import pandas as pd
import requests
from cryptography import x509
from ipwhois import IPWhois
from ipwhois.exceptions import ASNRegistryError, HTTPLookupError, IPDefinedError
from tqdm import tqdm
# ─────────────────────────────────────────────────────────────────────────────
# § 1. GLOBAL CONSTANTS
# ─────────────────────────────────────────────────────────────────────────────
_IANA_CACHE: Optional[Dict[str, Any]] = None
_IANA_LOCK = threading.Lock()
_WRITE_LOCK = threading.Lock()
_BUFFER_SIZE = 64 # flush JSONL every N records
_MAX_IPS_PER_DOMAIN = 5 # cap IPWhois RDAP lookups per domain
FREE_HOSTER_SUFFIXES: frozenset = frozenset({
"github.io", "gitlab.io", "pages.dev", "web.app", "firebaseapp.com",
"netlify.app", "vercel.app", "surge.sh", "000webhostapp.com",
"wordpress.com", "blogspot.com", "wixsite.com", "weebly.com",
})
CDN_SUFFIXES: frozenset = frozenset({
"cloudfront.net", "cloudflare.net", "akamaiedge.net",
"akamai.net", "fastly.net", "azureedge.net",
})
COMMON_2PART: frozenset = frozenset({
"co.uk", "ac.uk", "gov.uk", "org.uk", "net.uk",
"com.au", "net.au", "org.au", "co.jp", "ne.jp", "or.jp",
"com.br", "net.br", "org.br", "co.ma", "ac.ma", "gov.ma",
"com.tr", "net.tr", "org.tr", "co.in", "ac.in", "gov.in",
"com.cn", "net.cn", "org.cn", "co.kr", "or.kr",
"com.sg", "com.mx", "com.sa",
})
PRIVACY_TERMS: frozenset = frozenset([
"redacted", "privacy", "private", "whoisguard",
"domains by proxy", "data protected", "not disclosed", "gdpr",
])
# ─────────────────────────────────────────────────────────────────────────────
# § 2. THREAD-LOCAL RESOURCES
# ─────────────────────────────────────────────────────────────────────────────
_tl = threading.local()
def _session() -> requests.Session:
if not hasattr(_tl, "session"):
s = requests.Session()
s.headers["User-Agent"] = "malicious-domain-research/2.0"
_tl.session = s
return _tl.session
def _resolver(dns_servers: Optional[List[str]] = None) -> dns.resolver.Resolver:
key = "resolver_" + "_".join(dns_servers or ["default"])
if not hasattr(_tl, key):
r = dns.resolver.Resolver()
r.timeout = 3.0
r.lifetime = 5.0
if dns_servers:
r.nameservers = dns_servers
setattr(_tl, key, r)
return getattr(_tl, key)
# ─────────────────────────────────────────────────────────────────────────────
# § 3. SHARED UTILITIES
# ─────────────────────────────────────────────────────────────────────────────
def _is_ip(v: Optional[str]) -> bool:
if not v:
return False
try:
ipaddress.ip_address(v)
return True
except ValueError:
return False
def _normalize(raw: str) -> str:
v = raw.lower().strip().strip(" \t\n\r'\"<>.,;")
parsed = urlparse(v if "://" in v else "//" + v)
host = parsed.hostname
if not host:
raise ValueError(f"Cannot extract hostname from {raw!r}")
host = host.rstrip(".")
if host.startswith("www.") and not _is_ip(host):
host = host[4:]
return host
def _registered(host: str) -> str:
if _is_ip(host):
return host
parts = host.lower().strip(".").split(".")
if len(parts) <= 2:
return host
if ".".join(parts[-2:]) in COMMON_2PART and len(parts) >= 3:
return ".".join(parts[-3:])
return ".".join(parts[-2:])
def _tld(domain: str) -> str:
parts = domain.split(".")
if len(parts) < 2:
raise ValueError(f"Bad domain: {domain!r}")
return parts[-1]
def _parse_dt(v: Any) -> Optional[datetime]:
if v is None:
return None
try:
if isinstance(v, (int, float)):
return datetime.fromtimestamp(v, tz=timezone.utc)
s = str(v).strip()
if not s:
return None
if s.isdigit():
return datetime.fromtimestamp(int(s), tz=timezone.utc)
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(s)
return dt if dt.tzinfo else dt.replace(tzinfo=timezone.utc)
except ValueError:
return datetime.strptime(s, "%Y-%m-%d").replace(tzinfo=timezone.utc)
except Exception:
return None
def _days(a: Optional[datetime], b: Optional[datetime]) -> Optional[int]:
return None if (a is None or b is None) else (b - a).days
def _sz(v: Any) -> Any:
return v if v is not None else 0
def _ns_provider(ns: str) -> str:
parts = str(ns).lower().rstrip(".").split(".")
return ".".join(parts[-2:]) if len(parts) >= 2 else ns
def _suffix_in(name: str, suffixes: frozenset) -> bool:
n = str(name).lower().rstrip(".")
return any(n == s or n.endswith("." + s) for s in suffixes)
# ─────────────────────────────────────────────────────────────────────────────
# § 4. RDAP MODULE
# ─────────────────────────────────────────────────────────────────────────────
def _get_iana() -> Optional[Dict[str, Any]]:
global _IANA_CACHE
if _IANA_CACHE is not None:
return _IANA_CACHE
with _IANA_LOCK:
if _IANA_CACHE is None:
try:
r = requests.get("https://data.iana.org/rdap/dns.json",
headers={"User-Agent": "malicious-domain-research/2.0"},
timeout=20)
r.raise_for_status()
_IANA_CACHE = r.json()
except Exception:
return None
return _IANA_CACHE
def _rdap_url(reg: str) -> Optional[str]:
data = _get_iana()
if not data:
return None
try:
tld = _tld(reg)
except ValueError:
return None
for svc in data.get("services", []):
if tld in svc[0] and svc[1]:
return f"{svc[1][0].rstrip('/')}/domain/{reg}"
return None
def _null_rdap() -> Dict[str, Any]:
return {
"rdap_found": 0,
"rdap_domain_age_days": 0, "rdap_registration_period_days": 0,
"rdap_days_since_update": 0, "rdap_days_to_expiration": 0,
"rdap_missing_creation_date": 1, "rdap_missing_expiration_date": 1,
"rdap_missing_updated_date": 1,
"rdap_is_new_domain_1_day": 0, "rdap_is_new_domain_7_days": 0,
"rdap_is_new_domain_30_days": 0, "rdap_is_new_domain_90_days": 0,
"rdap_is_new_domain_365_days": 0,
"rdap_registered_for_one_year": 0, "rdap_registered_for_two_years_or_less": 0,
"rdap_registered_for_five_years_or_more": 0,
"rdap_expires_soon_7_days": 0, "rdap_expires_soon_30_days": 0,
"rdap_expires_soon_90_days": 0,
"rdap_recently_updated_7_days": 0, "rdap_recently_updated_30_days": 0,
"rdap_recently_updated_90_days": 0,
"rdap_registrar_missing": 1, "rdap_privacy_protected": 0,
"rdap_num_nameservers": 0, "rdap_nameservers_missing": 1,
"rdap_num_unique_nameserver_domains": 0,
"rdap_uses_cloudflare_ns": 0, "rdap_uses_google_ns": 0,
"rdap_uses_amazon_ns": 0, "rdap_uses_azure_ns": 0,
"rdap_uses_digitalocean_ns": 0, "rdap_uses_registrar_default_ns": 0,
"rdap_num_status_codes": 0,
"rdap_has_client_transfer_prohibited": 0, "rdap_has_server_transfer_prohibited": 0,
"rdap_has_client_hold": 0, "rdap_has_server_hold": 0,
"rdap_has_pending_delete": 0, "rdap_has_redemption_period": 0,
"rdap_has_any_hold_status": 0, "rdap_has_any_delete_status": 0,
"rdap_event_count": 0, "rdap_entity_count": 0,
}
def collect_rdap(reg: str, now: datetime) -> Dict[str, Any]:
url = _rdap_url(reg)
if not url:
return _null_rdap()
try:
r = _session().get(url, headers={"Accept": "application/rdap+json"},
timeout=20, allow_redirects=True)
if r.status_code != 200:
return _null_rdap()
data = r.json()
except Exception:
return _null_rdap()
creation = expiration = updated = None
for ev in data.get("events", []):
action = str(ev.get("eventAction", "")).lower()
dt = _parse_dt(ev.get("eventDate"))
if action in {"registration", "registered"}:
creation = dt
elif action in {"expiration", "expires"}:
expiration = dt
elif "last" in action:
updated = dt
age = _days(creation, now)
reg_period = _days(creation, expiration)
since_update = _days(updated, now)
to_exp = _days(now, expiration)
registrar = None
for ent in data.get("entities", []):
if "registrar" in ent.get("roles", []):
vcard = ent.get("vcardArray", [])
if len(vcard) == 2:
for item in vcard[1]:
if len(item) >= 4 and item[0] == "fn":
registrar = item[3]
break
ns_names = [str(ns.get("ldhName", "")).lower().rstrip(".")
for ns in data.get("nameservers", []) if ns.get("ldhName")]
ns_text = " ".join(ns_names)
ns_providers = {_ns_provider(x) for x in ns_names}
statuses = [str(s).lower() for s in data.get("status", [])]
st = " ".join(statuses)
ch = int("client hold" in st or "clienthold" in st)
sh = int("server hold" in st or "serverhold" in st)
pd_ = int("pending delete" in st or "pendingdelete" in st)
rp = int("redemption period" in st or "redemptionperiod" in st)
raw_j = json.dumps(data).lower()
priv = int(any(t in raw_j for t in PRIVACY_TERMS))
return {
"rdap_found": 1,
"rdap_domain_age_days": _sz(age),
"rdap_registration_period_days": _sz(reg_period),
"rdap_days_since_update": _sz(since_update),
"rdap_days_to_expiration": _sz(to_exp),
"rdap_missing_creation_date": int(creation is None),
"rdap_missing_expiration_date": int(expiration is None),
"rdap_missing_updated_date": int(updated is None),
"rdap_is_new_domain_1_day": int(age is not None and age <= 1),
"rdap_is_new_domain_7_days": int(age is not None and age <= 7),
"rdap_is_new_domain_30_days": int(age is not None and age <= 30),
"rdap_is_new_domain_90_days": int(age is not None and age <= 90),
"rdap_is_new_domain_365_days": int(age is not None and age <= 365),
"rdap_registered_for_one_year": int(reg_period is not None and reg_period <= 370),
"rdap_registered_for_two_years_or_less": int(reg_period is not None and reg_period <= 740),
"rdap_registered_for_five_years_or_more":int(reg_period is not None and reg_period >= 1825),
"rdap_expires_soon_7_days": int(to_exp is not None and 0 <= to_exp <= 7),
"rdap_expires_soon_30_days": int(to_exp is not None and 0 <= to_exp <= 30),
"rdap_expires_soon_90_days": int(to_exp is not None and 0 <= to_exp <= 90),
"rdap_recently_updated_7_days": int(since_update is not None and since_update <= 7),
"rdap_recently_updated_30_days": int(since_update is not None and since_update <= 30),
"rdap_recently_updated_90_days": int(since_update is not None and since_update <= 90),
"rdap_registrar_missing": int(registrar is None),
"rdap_privacy_protected": priv,
"rdap_num_nameservers": len(ns_names),
"rdap_nameservers_missing": int(len(ns_names) == 0),
"rdap_num_unique_nameserver_domains": len(ns_providers),
"rdap_uses_cloudflare_ns": int("cloudflare" in ns_text),
"rdap_uses_google_ns": int("google" in ns_text),
"rdap_uses_amazon_ns": int("awsdns" in ns_text or "amazon" in ns_text),
"rdap_uses_azure_ns": int("azure" in ns_text or "microsoft" in ns_text),
"rdap_uses_digitalocean_ns": int("digitalocean" in ns_text),
"rdap_uses_registrar_default_ns": int(any(x in ns_text for x in
["registrar", "domaincontrol", "namecheap", "sedoparking"])),
"rdap_num_status_codes": len(statuses),
"rdap_has_client_transfer_prohibited": int("client transfer prohibited" in st or "clienttransferprohibited" in st),
"rdap_has_server_transfer_prohibited": int("server transfer prohibited" in st or "servertransferprohibited" in st),
"rdap_has_client_hold": ch,
"rdap_has_server_hold": sh,
"rdap_has_pending_delete": pd_,
"rdap_has_redemption_period": rp,
"rdap_has_any_hold_status": int(bool(ch or sh)),
"rdap_has_any_delete_status": int(bool(pd_ or rp)),
"rdap_event_count": len(data.get("events", [])),
"rdap_entity_count": len(data.get("entities", [])),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 5. DNS MODULE
# ─────────────────────────────────────────────────────────────────────────────
def _dns_q(domain: str, rtype: str, res: dns.resolver.Resolver) -> Tuple[List[str], int]:
try:
ans = res.resolve(domain, rtype)
records = []
for rd in ans:
if rtype == "MX":
records.append(str(rd.exchange).rstrip(".").lower())
elif rtype == "TXT":
records.append("".join(p.decode("utf-8", "ignore") for p in rd.strings))
elif rtype == "SOA":
records.append(str(rd.mname).rstrip(".").lower())
elif rtype == "CNAME":
records.append(str(rd.target).rstrip(".").lower())
else:
records.append(str(rd).rstrip(".").lower())
return records, (ans.rrset.ttl if ans.rrset else 0)
except Exception:
return [], 0
def _null_dns() -> Dict[str, Any]:
return {
"dns_found": 0, "dns_resolves": 0,
"dns_num_a_records": 0, "dns_num_aaaa_records": 0, "dns_num_unique_ips": 0,
"dns_min_ttl": 0, "dns_max_ttl": 0, "dns_mean_ttl": 0,
"dns_has_low_ttl_60": 0, "dns_has_low_ttl_300": 0,
"dns_num_ns_records": 0, "dns_num_unique_ns_domains": 0,
"dns_uses_cloudflare_dns": 0, "dns_uses_google_dns": 0,
"dns_uses_amazon_dns": 0, "dns_uses_azure_dns": 0,
"dns_uses_godaddy_dns": 0, "dns_uses_namecheap_dns": 0,
"dns_num_mx_records": 0, "dns_uses_google_mail": 0,
"dns_uses_microsoft_mail": 0, "dns_uses_protonmail": 0, "dns_uses_zoho_mail": 0,
"dns_num_txt_records": 0, "dns_has_spf_record": 0,
"dns_has_dmarc_record": 0, "dns_dmarc_policy_none": 0,
"dns_dmarc_policy_quarantine": 0, "dns_dmarc_policy_reject": 0,
"dns_num_cname_records": 0, "dns_has_soa_record": 0, "dns_num_caa_records": 0,
"dns_has_dnssec": 0, "dns_num_dnskey_records": 0,
"dns_num_rrsig_records": 0, "dns_num_ds_records": 0,
"dns_dnssec_validation_possible": 0,
"dns_cname_depth": 0, "dns_cname_chain_failed": 0,
"dns_cname_chain_ends_with_a": 0, "dns_cname_chain_ends_with_aaaa": 0,
"dns_cname_ends_in_free_hoster": 0, "dns_cname_ends_in_cdn": 0,
"dns_cname_has_many_hops_3": 0,
}
def collect_dns(reg: str, dns_servers: Optional[List[str]]) -> Dict[str, Any]:
res = _resolver(dns_servers)
a, a_ttl = _dns_q(reg, "A", res)
aaaa, aa_ttl = _dns_q(reg, "AAAA", res)
ns, ns_ttl = _dns_q(reg, "NS", res)
mx, mx_ttl = _dns_q(reg, "MX", res)
txt, txt_ttl = _dns_q(reg, "TXT", res)
cname, cn_ttl = _dns_q(reg, "CNAME", res)
soa, soa_ttl = _dns_q(reg, "SOA", res)
caa, caa_ttl = _dns_q(reg, "CAA", res)
dmarc, _ = _dns_q(f"_dmarc.{reg}", "TXT", res)
dnskey, dk_ttl = _dns_q(reg, "DNSKEY", res)
rrsig, rr_ttl = _dns_q(reg, "RRSIG", res)
ds, ds_ttl = _dns_q(reg, "DS", res)
has_any = bool(a or aaaa or ns or mx or txt or soa)
if not has_any:
return _null_dns()
all_ips = sorted(set(a + aaaa))
ttls = [t for t in [a_ttl, aa_ttl, ns_ttl, mx_ttl, txt_ttl, cn_ttl,
soa_ttl, caa_ttl, dk_ttl, rr_ttl, ds_ttl] if t > 0]
min_ttl = min(ttls) if ttls else 0
max_ttl = max(ttls) if ttls else 0
mean_ttl = int(statistics.mean(ttls)) if ttls else 0
ns_text = " ".join(ns)
ns_providers = {_ns_provider(x) for x in ns}
mx_text = " ".join(mx)
txt_text = " ".join(txt)
dmarc_t = " ".join(dmarc)
# CNAME chain
chain: List[str] = []
current = reg.lower().rstrip(".")
failed = 0
for _ in range(10):
cnr, _ = _dns_q(current, "CNAME", res)
if not cnr:
break
tgt = cnr[0].lower().rstrip(".")
chain.append(tgt)
if tgt == current:
failed = 1
break
current = tgt
else:
failed = 1
final_a, _ = _dns_q(current, "A", res)
final_aaaa, _ = _dns_q(current, "AAAA", res)
return {
"dns_found": 1,
"dns_resolves": int(bool(all_ips)),
"dns_num_a_records": len(a),
"dns_num_aaaa_records": len(aaaa),
"dns_num_unique_ips": len(all_ips),
"dns_min_ttl": min_ttl, "dns_max_ttl": max_ttl, "dns_mean_ttl": mean_ttl,
"dns_has_low_ttl_60": int(0 < min_ttl <= 60),
"dns_has_low_ttl_300": int(0 < min_ttl <= 300),
"dns_num_ns_records": len(ns),
"dns_num_unique_ns_domains": len(ns_providers),
"dns_uses_cloudflare_dns": int("cloudflare" in ns_text),
"dns_uses_google_dns": int("google" in ns_text),
"dns_uses_amazon_dns": int("awsdns" in ns_text or "amazon" in ns_text),
"dns_uses_azure_dns": int("azure" in ns_text or "microsoft" in ns_text),
"dns_uses_godaddy_dns": int("domaincontrol" in ns_text or "godaddy" in ns_text),
"dns_uses_namecheap_dns": int("namecheap" in ns_text),
"dns_num_mx_records": len(mx),
"dns_uses_google_mail": int("google" in mx_text or "aspmx" in mx_text),
"dns_uses_microsoft_mail": int("outlook" in mx_text or "microsoft" in mx_text),
"dns_uses_protonmail": int("protonmail" in mx_text or "proton" in mx_text),
"dns_uses_zoho_mail": int("zoho" in mx_text),
"dns_num_txt_records": len(txt),
"dns_has_spf_record": int("v=spf1" in txt_text),
"dns_has_dmarc_record": int("v=dmarc1" in dmarc_t),
"dns_dmarc_policy_none": int("p=none" in dmarc_t),
"dns_dmarc_policy_quarantine": int("p=quarantine" in dmarc_t),
"dns_dmarc_policy_reject": int("p=reject" in dmarc_t),
"dns_num_cname_records": len(cname),
"dns_has_soa_record": int(bool(soa)),
"dns_num_caa_records": len(caa),
"dns_has_dnssec": int(bool(dnskey) or bool(rrsig) or bool(ds)),
"dns_num_dnskey_records": len(dnskey),
"dns_num_rrsig_records": len(rrsig),
"dns_num_ds_records": len(ds),
"dns_dnssec_validation_possible": int(bool(ds) and bool(dnskey)),
"dns_cname_depth": len(chain),
"dns_cname_chain_failed": failed,
"dns_cname_chain_ends_with_a": int(bool(final_a)),
"dns_cname_chain_ends_with_aaaa": int(bool(final_aaaa)),
"dns_cname_ends_in_free_hoster": int(_suffix_in(current, FREE_HOSTER_SUFFIXES)),
"dns_cname_ends_in_cdn": int(_suffix_in(current, CDN_SUFFIXES)),
"dns_cname_has_many_hops_3": int(len(chain) >= 3),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 6. TLS MODULE
# ─────────────────────────────────────────────────────────────────────────────
def _host_matches(host: str, pattern: str) -> bool:
h = host.lower().rstrip(".")
p = pattern.lower().rstrip(".")
if h == p:
return True
if p.startswith("*."):
return h.endswith(p[1:])
return False
def _null_tls() -> Dict[str, Any]:
return {
"tls_found": 0, "tls_certificate_age_days": 0, "tls_validity_period_days": 0,
"tls_days_to_expiration": 0, "tls_has_https": 0, "tls_handshake_success": 0,
"tls_certificate_valid": 0, "tls_certificate_expired": 0,
"tls_certificate_not_yet_valid": 0, "tls_issuer_missing": 1,
"tls_issuer_is_lets_encrypt": 0, "tls_issuer_is_google": 0,
"tls_issuer_is_cloudflare": 0, "tls_issuer_is_digicert": 0,
"tls_issuer_is_sectigo": 0, "tls_issuer_is_zerossl": 0,
"tls_issuer_is_amazon": 0, "tls_num_san_domains": 0,
"tls_has_wildcard_san": 0, "tls_has_many_sans_10": 0, "tls_has_many_sans_50": 0,
"tls_self_signed": 0, "tls_subject_cn_matches_host": 0,
"tls_any_san_matches_host": 0,
"tls_new_cert_1_day": 0, "tls_new_cert_7_days": 0, "tls_new_cert_30_days": 0,
"tls_expires_soon_7_days": 0, "tls_expires_soon_30_days": 0,
"tls_expires_soon_90_days": 0, "tls_short_validity_90_days": 0,
"tls_short_validity_180_days": 0, "tls_long_validity_398_days": 0,
}
def collect_tls(host: str, now: datetime) -> Dict[str, Any]:
der = None
try:
ctx = ssl._create_unverified_context()
with socket.create_connection((host, 443), timeout=5.0) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as tls:
der = tls.getpeercert(binary_form=True)
except Exception:
return _null_tls()
if not der:
return _null_tls()
try:
cert = x509.load_der_x509_certificate(der)
except Exception:
return _null_tls()
# second pass: verified handshake
cert_valid = 0
try:
ctx2 = ssl.create_default_context()
with socket.create_connection((host, 443), timeout=5.0) as sock:
with ctx2.wrap_socket(sock, server_hostname=host):
cert_valid = 1
except Exception:
pass
def _attr(name_obj, oid):
try:
attrs = name_obj.get_attributes_for_oid(oid)
return attrs[0].value if attrs else None
except Exception:
return None
try:
not_before = cert.not_valid_before_utc
not_after = cert.not_valid_after_utc
except AttributeError:
not_before = cert.not_valid_before.replace(tzinfo=timezone.utc)
not_after = cert.not_valid_after.replace(tzinfo=timezone.utc)
subject_cn = _attr(cert.subject, x509.NameOID.COMMON_NAME)
issuer_cn = _attr(cert.issuer, x509.NameOID.COMMON_NAME)
issuer_org = _attr(cert.issuer, x509.NameOID.ORGANIZATION_NAME)
try:
san_ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
sans = [s.lower() for s in san_ext.value.get_values_for_type(x509.DNSName)]
except Exception:
sans = []
age = _sz(_days(not_before, now))
validity = _sz(_days(not_before, not_after))
to_exp = _sz(_days(now, not_after))
iss_text = f"{issuer_cn or ''} {issuer_org or ''}".lower()
return {
"tls_found": 1,
"tls_certificate_age_days": age,
"tls_validity_period_days": validity,
"tls_days_to_expiration": to_exp,
"tls_has_https": 1, "tls_handshake_success": 1,
"tls_certificate_valid": cert_valid,
"tls_certificate_expired": int(to_exp < 0),
"tls_certificate_not_yet_valid": int(age < 0),
"tls_issuer_missing": int(len(iss_text.strip()) == 0),
"tls_issuer_is_lets_encrypt": int("let's encrypt" in iss_text or "lets encrypt" in iss_text),
"tls_issuer_is_google": int("google" in iss_text),
"tls_issuer_is_cloudflare": int("cloudflare" in iss_text),
"tls_issuer_is_digicert": int("digicert" in iss_text),
"tls_issuer_is_sectigo": int("sectigo" in iss_text or "comodoca" in iss_text or "comodo" in iss_text),
"tls_issuer_is_zerossl": int("zerossl" in iss_text or "zero ssl" in iss_text),
"tls_issuer_is_amazon": int("amazon" in iss_text or "aws" in iss_text),
"tls_num_san_domains": len(sans),
"tls_has_wildcard_san": int(any(s.startswith("*.") for s in sans)),
"tls_has_many_sans_10": int(len(sans) >= 10),
"tls_has_many_sans_50": int(len(sans) >= 50),
"tls_self_signed": int(cert.subject.rfc4514_string() == cert.issuer.rfc4514_string()),
"tls_subject_cn_matches_host": int(subject_cn is not None and _host_matches(host, subject_cn)),
"tls_any_san_matches_host": int(any(_host_matches(host, s) for s in sans)),
"tls_new_cert_1_day": int(0 <= age <= 1),
"tls_new_cert_7_days": int(0 <= age <= 7),
"tls_new_cert_30_days": int(0 <= age <= 30),
"tls_expires_soon_7_days": int(0 <= to_exp <= 7),
"tls_expires_soon_30_days": int(0 <= to_exp <= 30),
"tls_expires_soon_90_days": int(0 <= to_exp <= 90),
"tls_short_validity_90_days": int(0 < validity <= 90),
"tls_short_validity_180_days": int(0 < validity <= 180),
"tls_long_validity_398_days": int(validity > 398),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 7. IP / ASN / HOSTING MODULE
# ─────────────────────────────────────────────────────────────────────────────
def _null_ip() -> Dict[str, Any]:
return {
"ip_found": 0, "ip_num_resolved_ips": 0, "ip_num_ipv4": 0, "ip_num_ipv6": 0,
"ip_has_public_ip": 0, "ip_has_private_ip": 0, "ip_has_reserved_ip": 0,
"ip_has_loopback_ip": 0, "ip_has_multicast_ip": 0, "ip_has_link_local_ip": 0,
"ip_num_unique_asns": 0, "ip_num_unique_countries": 0,
"ip_num_unique_registries": 0, "ip_num_unique_networks": 0, "ip_num_unique_cidrs": 0,
"ip_asn_missing": 1, "ip_country_missing": 1, "ip_network_name_missing": 1,
"ip_network_registration_age_days_min": 0, "ip_network_registration_age_days_mean": 0,
"ip_network_recently_allocated_30d": 0, "ip_network_recently_allocated_90d": 0,
"ip_network_age_missing": 1,
"ip_uses_cloud_hosting": 0, "ip_uses_cdn_or_proxy": 0,
"ip_uses_amazon_hosting": 0, "ip_uses_google_hosting": 0,
"ip_uses_microsoft_hosting": 0, "ip_uses_cloudflare_hosting": 0,
"ip_uses_digitalocean_hosting": 0, "ip_uses_ovh_hosting": 0,
"ip_uses_hetzner_hosting": 0, "ip_uses_godaddy_hosting": 0,
"ip_uses_namecheap_hosting": 0,
"ip_multi_asn_hosting": 0, "ip_multi_country_hosting": 0, "ip_multi_registry_hosting": 0,
}
def collect_ip(host: str, now: datetime) -> Dict[str, Any]:
ips: Set[str] = set()
try:
for result in socket.getaddrinfo(host, None):
ips.add(result[4][0])
except Exception:
pass
if not ips:
return _null_ip()
ip_list = sorted(ips)
n4 = n6 = pub = priv = rsv = loop = mcast = link = 0
for ip in ip_list:
obj = ipaddress.ip_address(ip)
if obj.version == 4:
n4 += 1
else:
n6 += 1
pub |= int(obj.is_global)
priv |= int(obj.is_private)
rsv |= int(obj.is_reserved)
loop |= int(obj.is_loopback)
mcast |= int(obj.is_multicast)
link |= int(obj.is_link_local)
rdap_results = []
for ip in ip_list[:_MAX_IPS_PER_DOMAIN]:
try:
r = IPWhois(ip, timeout=15).lookup_rdap(depth=1)
if r:
rdap_results.append(r)
except Exception:
pass
asns: Set[str] = set(); descs: Set[str] = set()
registries: Set[str] = set(); countries: Set[str] = set()
networks: Set[str] = set(); cidrs: Set[str] = set()
ages: List[int] = []
for r in rdap_results:
for key, target in [("asn", asns), ("asn_description", descs),
("asn_registry", registries), ("asn_country_code", countries)]:
if r.get(key):
target.add(str(r[key]))
net = r.get("network") or {}
if net.get("name"): networks.add(str(net["name"]))
if net.get("cidr"): cidrs.add(str(net["cidr"]))
if net.get("country"): countries.add(str(net["country"]))
for ev in (net.get("events") or []):
action = str(ev.get("action") or ev.get("eventAction") or "").lower()
if any(t in action for t in ["registration", "allocated", "assignment"]):
dt = _parse_dt(ev.get("timestamp") or ev.get("eventDate") or ev.get("date"))
if dt:
age = _days(dt, now)
if age is not None and age >= 0:
ages.append(age)
text = " ".join(list(descs) + list(networks)).lower()
amazon = int("amazon" in text or "aws" in text)
google = int("google" in text)
microsoft = int("microsoft" in text or "azure" in text)
cloudflare = int("cloudflare" in text)
digitalocean= int("digitalocean" in text or "digital ocean" in text)
ovh = int("ovh" in text)
hetzner = int("hetzner" in text)
godaddy = int("godaddy" in text or "go daddy" in text)
namecheap = int("namecheap" in text)
cdn = int(cloudflare or any(x in text for x in ["akamai", "fastly", "cdn77", "imperva"]))
cloud = int(amazon or google or microsoft or digitalocean or ovh or hetzner)
return {
"ip_found": 1,
"ip_num_resolved_ips": len(ip_list),
"ip_num_ipv4": n4, "ip_num_ipv6": n6,
"ip_has_public_ip": pub, "ip_has_private_ip": priv, "ip_has_reserved_ip": rsv,
"ip_has_loopback_ip": loop, "ip_has_multicast_ip": mcast, "ip_has_link_local_ip": link,
"ip_num_unique_asns": len(asns), "ip_num_unique_countries": len(countries),
"ip_num_unique_registries": len(registries), "ip_num_unique_networks": len(networks),
"ip_num_unique_cidrs": len(cidrs),
"ip_asn_missing": int(len(asns) == 0),
"ip_country_missing": int(len(countries) == 0),
"ip_network_name_missing": int(len(networks) == 0),
"ip_network_registration_age_days_min": min(ages) if ages else 0,
"ip_network_registration_age_days_mean": int(sum(ages) / len(ages)) if ages else 0,
"ip_network_recently_allocated_30d": int(bool(ages) and min(ages) <= 30),
"ip_network_recently_allocated_90d": int(bool(ages) and min(ages) <= 90),
"ip_network_age_missing": int(len(ages) == 0),
"ip_uses_cloud_hosting": cloud, "ip_uses_cdn_or_proxy": cdn,
"ip_uses_amazon_hosting": amazon, "ip_uses_google_hosting": google,
"ip_uses_microsoft_hosting": microsoft, "ip_uses_cloudflare_hosting": cloudflare,
"ip_uses_digitalocean_hosting": digitalocean, "ip_uses_ovh_hosting": ovh,
"ip_uses_hetzner_hosting": hetzner, "ip_uses_godaddy_hosting": godaddy,
"ip_uses_namecheap_hosting": namecheap,
"ip_multi_asn_hosting": int(len(asns) > 1),
"ip_multi_country_hosting": int(len(countries) > 1),
"ip_multi_registry_hosting": int(len(registries) > 1),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 8. CERTIFICATE TRANSPARENCY MODULE
# ─────────────────────────────────────────────────────────────────────────────
def _null_ct() -> Dict[str, Any]:
return {
"ct_found": 0, "ct_first_seen_days_ago": 0, "ct_last_seen_days_ago": 0,
"ct_observation_span_days": 0, "ct_num_certs_total": 0,
"ct_num_certs_7d": 0, "ct_num_certs_30d": 0, "ct_num_certs_90d": 0,
"ct_num_unique_issuers": 0, "ct_num_unique_san_domains": 0,
"ct_has_wildcard_cert": 0, "ct_reissue_rate_90d": 0.0,
"ct_min_interarrival_days": 0, "ct_mean_interarrival_days": 0,
"ct_newly_observed_7d": 0, "ct_newly_observed_30d": 0,
}
def collect_ct(reg: str, now: datetime) -> Dict[str, Any]:
try:
r = _session().get(f"https://crt.sh/?q={quote(reg)}&output=json", timeout=30)
if r.status_code != 200 or not r.text.strip():
return _null_ct()
records = r.json()
except Exception:
return _null_ct()
if not records:
return _null_ct()
issuers: Set[str] = set()
sans: Set[str] = set()
cert_ids: Set[str] = set()
dates: List[datetime] = []
has_wildcard = 0
for rec in records:
if rec.get("issuer_name"):
issuers.add(str(rec["issuer_name"]))
for name in str(rec.get("name_value") or "").replace("\r", "\n").split("\n"):
n = name.strip().lower().rstrip(".")
if n:
sans.add(n)
if n.startswith("*."):
has_wildcard = 1
dt = _parse_dt(rec.get("not_before") or rec.get("entry_timestamp"))
if dt:
dates.append(dt)
cid = rec.get("min_cert_id") or rec.get("id") or f"{rec.get('serial_number')}:{rec.get('not_before')}"
cert_ids.add(str(cid))
if not dates:
return _null_ct()
first, last = min(dates), max(dates)
first_days = max(0, _days(first, now) or 0)
last_days = max(0, _days(last, now) or 0)
span = max(0, _days(first, last) or 0)
n7 = sum(1 for d in dates if 0 <= (now - d).days <= 7)
n30 = sum(1 for d in dates if 0 <= (now - d).days <= 30)
n90 = sum(1 for d in dates if 0 <= (now - d).days <= 90)
udates = sorted(set(dates))
if len(udates) >= 2:
diffs = [max(0, (udates[i] - udates[i-1]).days) for i in range(1, len(udates))]
min_inter = min(diffs)
mean_inter = int(sum(diffs) / len(diffs))
else:
min_inter = mean_inter = 0
return {
"ct_found": 1,
"ct_first_seen_days_ago": first_days,
"ct_last_seen_days_ago": last_days,
"ct_observation_span_days": span,
"ct_num_certs_total": len(cert_ids),
"ct_num_certs_7d": n7, "ct_num_certs_30d": n30, "ct_num_certs_90d": n90,
"ct_num_unique_issuers": len(issuers),
"ct_num_unique_san_domains": len(sans),
"ct_has_wildcard_cert": has_wildcard,
"ct_reissue_rate_90d": round(n90 / 90.0, 4),
"ct_min_interarrival_days": min_inter,
"ct_mean_interarrival_days": mean_inter,
"ct_newly_observed_7d": int(first_days <= 7),
"ct_newly_observed_30d": int(first_days <= 30),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 9. CROSS-LAYER FEATURES
# ─────────────────────────────────────────────────────────────────────────────
def _x(f: Dict[str, Any], k: str, d: float = 0.0) -> float:
try:
v = f.get(k, d)
return float(v) if v is not None else d
except Exception:
return d
def compute_cross(f: Dict[str, Any]) -> Dict[str, Any]:
rdap_age = _x(f, "rdap_domain_age_days")
tls_age = _x(f, "tls_certificate_age_days")
min_ttl = _x(f, "dns_min_ttl")
rdap_ns = _x(f, "rdap_num_nameservers")
dns_ns = _x(f, "dns_num_ns_records")
dns_ips = _x(f, "dns_num_unique_ips")
ip_ips = _x(f, "ip_num_resolved_ips")
return {
"x_age_cert_vs_domain_ratio": round(tls_age / (rdap_age + 1.0), 6),
"x_ttl_to_domain_age_ratio": round(min_ttl / (rdap_age + 1.0), 6),
"x_domain_and_cert_both_new_7d": int(_x(f,"rdap_is_new_domain_7_days")==1 and _x(f,"tls_new_cert_7_days")==1),
"x_domain_new_but_no_dns": int(_x(f,"rdap_is_new_domain_30_days")==1 and _x(f,"dns_resolves")==0),
"x_low_ttl_on_old_domain": int(_x(f,"dns_has_low_ttl_300")==1 and rdap_age > 365),
"x_dns_ip_count_vs_hosting_ip_count_diff": abs(dns_ips - ip_ips),
"x_rdap_dns_ns_count_diff": abs(rdap_ns - dns_ns),
"x_ns_provider_consistency_cloudflare": int(_x(f,"rdap_uses_cloudflare_ns")==_x(f,"dns_uses_cloudflare_dns")),
"x_ns_provider_consistency_google": int(_x(f,"rdap_uses_google_ns")==_x(f,"dns_uses_google_dns")),
"x_ns_provider_consistency_amazon": int(_x(f,"rdap_uses_amazon_ns")==_x(f,"dns_uses_amazon_dns")),
"x_rdap_dns_ns_mismatch": int(abs(rdap_ns - dns_ns) >= 2),
"x_tls_valid_but_no_dns": int(_x(f,"tls_certificate_valid")==1 and _x(f,"dns_resolves")==0),
"x_mx_without_dmarc_and_new_domain":int(_x(f,"dns_num_mx_records")>=1 and _x(f,"dns_has_dmarc_record")==0 and _x(f,"rdap_is_new_domain_30_days")==1),
"x_ct_new_but_domain_old": int(_x(f,"ct_newly_observed_7d")==1 and rdap_age > 365),
"x_ct_burst_and_new_domain": int(_x(f,"ct_num_certs_7d")>=3 and _x(f,"rdap_is_new_domain_30_days")==1),
"x_dnssec_and_old_domain": int(_x(f,"dns_has_dnssec")==1 and rdap_age > 365),
}
# ─────────────────────────────────────────────────────────────────────────────
# § 10. PER-DOMAIN EXTRACTION — 5 modules concurrent via inner executor
# ─────────────────────────────────────────────────────────────────────────────
def extract_one(
domain: str,
include_ct: bool = True,
dns_servers: Optional[List[str]] = None,
) -> Dict[str, Any]:
now = datetime.now(timezone.utc)
try:
host = _normalize(domain)
except ValueError:
base: Dict[str, Any] = {"input_valid": 0, "input_is_ip_address": 0}
base.update(_null_rdap()); base.update(_null_dns())
base.update(_null_ip()); base.update(_null_tls())
if include_ct:
base.update(_null_ct())
base.update(compute_cross(base))
return base
ip_input = _is_ip(host)
reg = _registered(host) if not ip_input else host
n_inner = 5 if include_ct else 4
with ThreadPoolExecutor(max_workers=n_inner) as inner:
f_rdap = inner.submit(collect_rdap, reg, now)
f_dns = inner.submit(collect_dns, reg, dns_servers)
f_tls = inner.submit(collect_tls, host, now)
f_ip = inner.submit(collect_ip, host, now)
f_ct = inner.submit(collect_ct, reg, now) if include_ct else None
out: Dict[str, Any] = {
"input_valid": 1,
"input_is_ip_address": int(ip_input),
}
def _safe_result(future, null_fn, timeout=60):
if future is None:
return null_fn()
try:
return future.result(timeout=timeout) or null_fn()
except Exception:
return null_fn()
out.update(_safe_result(f_rdap, _null_rdap))
out.update(_safe_result(f_dns, _null_dns))
out.update(_safe_result(f_tls, _null_tls))
out.update(_safe_result(f_ip, _null_ip))
if include_ct:
out.update(_safe_result(f_ct, _null_ct))
out.update(compute_cross(out))
return out
# ─────────────────────────────────────────────────────────────────────────────
# § 11. BATCH COLLECTION — JSONL checkpoint + auto-resume
# ─────────────────────────────────────────────────────────────────────────────
def _load_checkpoint(jsonl_path: Path) -> Dict[str, Dict]:
done: Dict[str, Dict] = {}
if not jsonl_path.exists():
return done
with open(jsonl_path) as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
domain = rec.get("input", "")
if domain and rec.get("_error") is None:
done[domain] = rec
except Exception:
pass
return done
def _flush_buffer(buf: List[Dict], jsonl_path: Path) -> None:
with _WRITE_LOCK:
with open(jsonl_path, "a") as fh:
for rec in buf:
fh.write(json.dumps(rec, default=str) + "\n")
def _save_final(done: Dict[str, Dict], rows: List[Dict], output_path: Path) -> None:
records = []
for row in rows:
domain = str(row.get("input", ""))
rec = done.get(domain) or {
"input": domain,
"class": row.get("class", ""),
"label": row.get("label", ""),
"_error": "not_collected",
}
records.append(rec)
out_df = pd.DataFrame(records)
# coerce feature columns to numeric
skip = {"input", "class", "label", "_error"}
for col in out_df.columns:
if col not in skip and col != "_elapsed_s":
out_df[col] = pd.to_numeric(out_df[col], errors="coerce").fillna(0)
ext = output_path.suffix.lower()
if ext == ".parquet":
out_df.to_parquet(output_path, index=False)
elif ext in (".tsv", ".txt"):
out_df.to_csv(output_path, sep="\t", index=False)
else:
out_df.to_csv(output_path, index=False)
n_err = int(out_df["_error"].notna().sum()) if "_error" in out_df.columns else 0
print(f"Saved {len(out_df):,} rows → {output_path}")
print(f" Success: {len(out_df) - n_err:,} | Errors/timeouts: {n_err:,}")
def process_csv(
csv_path: Path,
output_path: Path,
workers: int = 50,
domain_timeout: float = 120.0,
include_ct: bool = False,
dns_servers: Optional[List[str]] = None,
) -> None:
df = pd.read_csv(csv_path)
df.columns = [c.strip() for c in df.columns]
if "input" not in df.columns:
raise ValueError(f"CSV must have an 'input' column. Found: {list(df.columns)}")
jsonl_path = output_path.with_suffix(".jsonl")
done = _load_checkpoint(jsonl_path)
rows = df.to_dict("records")
todo = [r for r in rows if str(r.get("input", "")) not in done]
print(f"Total: {len(rows):,} | Already done: {len(done):,} | Remaining: {len(todo):,}")
if not todo:
print("All rows collected. Writing final output…")
_save_final(done, rows, output_path)
return
# pre-warm IANA bootstrap (one HTTP request, then cached for all threads)
print("Pre-warming IANA RDAP bootstrap…")
_get_iana()
def _worker(row: Dict) -> Dict:
domain = str(row.get("input", ""))
t0 = time.monotonic()
try:
feat = extract_one(domain, include_ct=include_ct, dns_servers=dns_servers)
elapsed = time.monotonic() - t0
rec = {
"input": domain,
"class": row.get("class", ""),
"label": row.get("label", ""),
"_error": None,
"_elapsed_s": round(elapsed, 2),
}
rec.update(feat)
return rec
except Exception as e:
return {
"input": domain,
"class": row.get("class", ""),
"label": row.get("label", ""),
"_error": type(e).__name__,
"_elapsed_s": round(time.monotonic() - t0, 2),
}
pbar = tqdm(total=len(todo), desc="Collecting", unit="domain",
dynamic_ncols=True, smoothing=0.1)
buf: List[Dict] = []
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_row = {executor.submit(_worker, row): row for row in todo}
for future in as_completed(future_to_row):
try:
rec = future.result(timeout=domain_timeout + 15)
except Exception:
row = future_to_row[future]
rec = {
"input": str(row.get("input", "")),
"class": row.get("class", ""),
"label": row.get("label", ""),
"_error": "timeout",
"_elapsed_s": domain_timeout,
}
done[rec["input"]] = rec
buf.append(rec)
pbar.update(1)
if len(buf) >= _BUFFER_SIZE:
_flush_buffer(buf, jsonl_path)
buf.clear()
if buf:
_flush_buffer(buf, jsonl_path)
pbar.close()
print("\nWriting final output…")
_save_final(done, rows, output_path)
# ─────────────────────────────────────────────────────────────────────────────
# § 12. CLI
# ─────────────────────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(
description="Collect DNS/RDAP/TLS/IP/CT metadata for a domain CSV.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
ap.add_argument("--input", required=True, help="Input CSV (must have 'input' column)")
ap.add_argument("--output", required=True, help="Output file (.csv / .parquet / .tsv)")
ap.add_argument("--workers", type=int, default=50,
help="Parallel domain workers (30-100 recommended for Colab)")
ap.add_argument("--timeout", type=float, default=120.0,
help="Per-domain wall-clock timeout in seconds")
ap.add_argument("--ct", action="store_true",
help="Include Certificate Transparency (crt.sh) — ~10-30 s extra per domain")
ap.add_argument("--dns", nargs="+", default=["8.8.8.8", "1.1.1.1"],
metavar="IP", help="DNS resolver IPs")
args = ap.parse_args()
csv_path = Path(args.input)
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
if not csv_path.exists():
raise FileNotFoundError(f"Input CSV not found: {csv_path}")
print(f"Input : {csv_path}")
print(f"Output : {output_path}")
print(f"Workers : {args.workers} (×5 inner module threads = "
f"{args.workers * (5 if args.ct else 4)} max concurrent requests)")
print(f"Timeout : {args.timeout}s per domain")
print(f"CT logs : {'yes' if args.ct else 'no'}")
print(f"DNS : {args.dns}\n")
process_csv(
csv_path=csv_path,
output_path=output_path,
workers=args.workers,
domain_timeout=args.timeout,
include_ct=args.ct,
dns_servers=args.dns,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment