Skip to content

Instantly share code, notes, and snippets.

@nniiicc
Created September 26, 2025 23:11
Show Gist options
  • Save nniiicc/b9085d510b6edc661eb9a92cb6f915a9 to your computer and use it in GitHub Desktop.
Save nniiicc/b9085d510b6edc661eb9a92cb6f915a9 to your computer and use it in GitHub Desktop.
# ntrid_scraper_all_and_validate.py
import json, re, time, csv
from collections import OrderedDict
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException, ElementClickInterceptedException
URL = "https://www.nitrd.gov/coordination-areas/ai/90-fr-9088-responses/"
def wait(driver, seconds=15):
return WebDriverWait(driver, seconds)
def parse_detail_table(detail_tr):
"""
Parse ONLY the 2-col table inside the injected detail <tr>.
"""
record = {}
inner_table = detail_tr.find_element(By.CSS_SELECTOR, "td[colspan='2'] table")
for row in inner_table.find_elements(By.CSS_SELECTOR, "tr"):
tds = row.find_elements(By.TAG_NAME, "td")
if len(tds) != 2:
continue
label = tds[0].text.strip().rstrip(":")
value = tds[1].text.strip()
# Prefer href for PDF/link-like fields
if label.lower().startswith("pdf link") or label.lower() in {"pdf", "link"}:
try:
a = tds[1].find_element(By.TAG_NAME, "a")
value = a.get_attribute("href")
except Exception:
pass
record[label] = value
return record
def expand_and_parse_row(driver, row_el):
"""
Click the details-control in THIS row and parse the very next sibling
detail row injected by DataTables.
"""
btn = row_el.find_element(By.CSS_SELECTOR, ".priority-4.details-control")
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn)
wait(driver).until(EC.element_to_be_clickable(btn)).click()
detail_tr = wait(driver).until(
EC.presence_of_element_located((
By.XPATH, "following-sibling::tr[1][td[@colspan='2']]"
))
)
record = parse_detail_table(detail_tr)
# Optional: collapse again to keep DOM light
try:
btn.click()
time.sleep(0.03)
except Exception:
pass
return record
def set_page_length_to_all(driver):
"""
Switch the DataTables length select to 'All'. In DataTables this is usually value='-1'.
"""
try:
select = wait(driver).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "select[name$='_length']"))
)
# Try 'All' option, falling back to value '-1'
try:
# If the option text contains 'All', select it by JS
options = select.find_elements(By.TAG_NAME, "option")
all_val = None
for opt in options:
if "all" in opt.text.strip().lower():
all_val = opt.get_attribute("value")
break
if all_val is None:
all_val = "-1"
driver.execute_script(
"arguments[0].value=arguments[1]; arguments[0].dispatchEvent(new Event('change',{bubbles:true}))",
select, all_val
)
except Exception:
driver.execute_script(
"arguments[0].value='-1'; arguments[0].dispatchEvent(new Event('change',{bubbles:true}))",
select
)
# Wait for redraw (table body updates)
wait(driver).until(EC.presence_of_element_located((By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']")))
time.sleep(0.5)
except TimeoutException:
print("Could not find length select; proceeding with current view.")
def get_datatables_info_counts(driver):
"""
Parse DataTables 'info' element to get total expected entries and current shown range.
Example: 'Showing 1 to 100 of 10878 entries' or 'Showing 1 to 10878 of 10878 entries'.
"""
total_expected = None
showing_to = None
try:
info_el = wait(driver).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div.dataTables_info"))
)
text = info_el.text.strip()
# Look for the standard 'of N entries'
m = re.search(r"of\s+([\d,]+)\s+entries", text, flags=re.I)
if m:
total_expected = int(m.group(1).replace(",", ""))
# Also capture 'to X' (the highest visible index)
m2 = re.search(r"to\s+([\d,]+)", text, flags=re.I)
if m2:
showing_to = int(m2.group(1).replace(",", ""))
except TimeoutException:
pass
return total_expected, showing_to
def parse_all_rows_on_page(driver, key_fields=("PDF Link", "PDF link", "Link", "Name", "Respondent", "Organization")):
"""
With page length set to All, expand every row and scrape details once.
Dedupe using (PDF/link, Name/Respondent/Organization) composite key.
"""
seen_keys = set()
results = []
rows = driver.find_elements(By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']")
for i, row in enumerate(rows, 1):
try:
record = expand_and_parse_row(driver, row)
# Build a dedupe key from likely-identifying fields
pdf = record.get("PDF Link") or record.get("PDF link") or record.get("Link") or ""
name = record.get("Name") or record.get("Respondent") or record.get("Organization") or ""
key = (pdf.strip(), name.strip())
if key not in seen_keys:
seen_keys.add(key)
results.append(record)
if i % 200 == 0:
print(f"Expanded {i} rows... unique so far: {len(results)}")
except (StaleElementReferenceException, ElementClickInterceptedException, TimeoutException):
continue
except Exception as e:
print(f"Row {i} parse error: {e}")
continue
return results
def export_json(path, records):
with open(path, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
def export_csv(path, records):
if not records:
with open(path, "w", newline="", encoding="utf-8") as f:
f.write("") # empty file
return
# Union of all keys to make a wide header
header = OrderedDict()
for r in records:
for k in r.keys():
header[k] = True
fieldnames = list(header.keys())
with open(path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for r in records:
writer.writerow(r)
def validate_scrape(driver, records):
"""
Prints a validation report:
- DataTables expected total (from 'info')
- Count of visible rows in tbody
- Unique records scraped
- Unique PDFs
Returns a dict with metrics for programmatic checks if desired.
"""
expected_total, showing_to = get_datatables_info_counts(driver)
visible_rows = len(driver.find_elements(By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']"))
scraped_total = len(records)
unique_pdfs = len({(r.get("PDF Link") or r.get("PDF link") or r.get("Link") or "").strip() for r in records})
print("\n=== Validation Summary ===")
print(f"DataTables expected total: {expected_total if expected_total is not None else 'N/A'}")
print(f"Visible rows (tbody): {visible_rows}")
print(f"Scraped unique records: {scraped_total}")
print(f"Unique PDF links: {unique_pdfs}")
issues = []
if expected_total is not None and scraped_total < expected_total:
issues.append(f"Scraped {scraped_total}, but DataTables reports {expected_total}.")
if expected_total is not None and visible_rows != expected_total:
issues.append(f"Visible rows ({visible_rows}) != DataTables expected total ({expected_total}).")
if issues:
print("Warnings:")
for msg in issues:
print(f" - {msg}")
else:
print("Counts look consistent.")
return {
"expected_total": expected_total,
"visible_rows": visible_rows,
"scraped_total": scraped_total,
"unique_pdfs": unique_pdfs,
"showing_to": showing_to,
"warnings": issues,
}
if __name__ == "__main__":
from selenium.webdriver.chrome.options import Options
opts = Options()
opts.add_argument("--headless=new")
opts.add_argument("--disable-gpu")
opts.add_argument("--window-size=1600,1200")
driver = webdriver.Chrome(options=opts)
try:
driver.get(URL)
wait(driver, 30).until(EC.presence_of_element_located((By.CSS_SELECTOR, "table.dataTable")))
set_page_length_to_all(driver) # <<— show ALL entries
records = parse_all_rows_on_page(driver)
# Exports
export_json("2025_ai_plan_responses.json", records)
export_csv("2025_ai_plan_responses.csv", records)
# Validation pass (prints summary)
validate_scrape(driver, records)
print(f"\nDone. Wrote {len(records)} records to JSON and CSV.")
finally:
driver.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment