Created
September 26, 2025 23:11
-
-
Save nniiicc/b9085d510b6edc661eb9a92cb6f915a9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ntrid_scraper_all_and_validate.py | |
import json, re, time, csv | |
from collections import OrderedDict | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException, ElementClickInterceptedException | |
URL = "https://www.nitrd.gov/coordination-areas/ai/90-fr-9088-responses/" | |
def wait(driver, seconds=15): | |
return WebDriverWait(driver, seconds) | |
def parse_detail_table(detail_tr): | |
""" | |
Parse ONLY the 2-col table inside the injected detail <tr>. | |
""" | |
record = {} | |
inner_table = detail_tr.find_element(By.CSS_SELECTOR, "td[colspan='2'] table") | |
for row in inner_table.find_elements(By.CSS_SELECTOR, "tr"): | |
tds = row.find_elements(By.TAG_NAME, "td") | |
if len(tds) != 2: | |
continue | |
label = tds[0].text.strip().rstrip(":") | |
value = tds[1].text.strip() | |
# Prefer href for PDF/link-like fields | |
if label.lower().startswith("pdf link") or label.lower() in {"pdf", "link"}: | |
try: | |
a = tds[1].find_element(By.TAG_NAME, "a") | |
value = a.get_attribute("href") | |
except Exception: | |
pass | |
record[label] = value | |
return record | |
def expand_and_parse_row(driver, row_el): | |
""" | |
Click the details-control in THIS row and parse the very next sibling | |
detail row injected by DataTables. | |
""" | |
btn = row_el.find_element(By.CSS_SELECTOR, ".priority-4.details-control") | |
driver.execute_script("arguments[0].scrollIntoView({block:'center'});", btn) | |
wait(driver).until(EC.element_to_be_clickable(btn)).click() | |
detail_tr = wait(driver).until( | |
EC.presence_of_element_located(( | |
By.XPATH, "following-sibling::tr[1][td[@colspan='2']]" | |
)) | |
) | |
record = parse_detail_table(detail_tr) | |
# Optional: collapse again to keep DOM light | |
try: | |
btn.click() | |
time.sleep(0.03) | |
except Exception: | |
pass | |
return record | |
def set_page_length_to_all(driver): | |
""" | |
Switch the DataTables length select to 'All'. In DataTables this is usually value='-1'. | |
""" | |
try: | |
select = wait(driver).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, "select[name$='_length']")) | |
) | |
# Try 'All' option, falling back to value '-1' | |
try: | |
# If the option text contains 'All', select it by JS | |
options = select.find_elements(By.TAG_NAME, "option") | |
all_val = None | |
for opt in options: | |
if "all" in opt.text.strip().lower(): | |
all_val = opt.get_attribute("value") | |
break | |
if all_val is None: | |
all_val = "-1" | |
driver.execute_script( | |
"arguments[0].value=arguments[1]; arguments[0].dispatchEvent(new Event('change',{bubbles:true}))", | |
select, all_val | |
) | |
except Exception: | |
driver.execute_script( | |
"arguments[0].value='-1'; arguments[0].dispatchEvent(new Event('change',{bubbles:true}))", | |
select | |
) | |
# Wait for redraw (table body updates) | |
wait(driver).until(EC.presence_of_element_located((By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']"))) | |
time.sleep(0.5) | |
except TimeoutException: | |
print("Could not find length select; proceeding with current view.") | |
def get_datatables_info_counts(driver): | |
""" | |
Parse DataTables 'info' element to get total expected entries and current shown range. | |
Example: 'Showing 1 to 100 of 10878 entries' or 'Showing 1 to 10878 of 10878 entries'. | |
""" | |
total_expected = None | |
showing_to = None | |
try: | |
info_el = wait(driver).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, "div.dataTables_info")) | |
) | |
text = info_el.text.strip() | |
# Look for the standard 'of N entries' | |
m = re.search(r"of\s+([\d,]+)\s+entries", text, flags=re.I) | |
if m: | |
total_expected = int(m.group(1).replace(",", "")) | |
# Also capture 'to X' (the highest visible index) | |
m2 = re.search(r"to\s+([\d,]+)", text, flags=re.I) | |
if m2: | |
showing_to = int(m2.group(1).replace(",", "")) | |
except TimeoutException: | |
pass | |
return total_expected, showing_to | |
def parse_all_rows_on_page(driver, key_fields=("PDF Link", "PDF link", "Link", "Name", "Respondent", "Organization")): | |
""" | |
With page length set to All, expand every row and scrape details once. | |
Dedupe using (PDF/link, Name/Respondent/Organization) composite key. | |
""" | |
seen_keys = set() | |
results = [] | |
rows = driver.find_elements(By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']") | |
for i, row in enumerate(rows, 1): | |
try: | |
record = expand_and_parse_row(driver, row) | |
# Build a dedupe key from likely-identifying fields | |
pdf = record.get("PDF Link") or record.get("PDF link") or record.get("Link") or "" | |
name = record.get("Name") or record.get("Respondent") or record.get("Organization") or "" | |
key = (pdf.strip(), name.strip()) | |
if key not in seen_keys: | |
seen_keys.add(key) | |
results.append(record) | |
if i % 200 == 0: | |
print(f"Expanded {i} rows... unique so far: {len(results)}") | |
except (StaleElementReferenceException, ElementClickInterceptedException, TimeoutException): | |
continue | |
except Exception as e: | |
print(f"Row {i} parse error: {e}") | |
continue | |
return results | |
def export_json(path, records): | |
with open(path, "w", encoding="utf-8") as f: | |
json.dump(records, f, indent=2, ensure_ascii=False) | |
def export_csv(path, records): | |
if not records: | |
with open(path, "w", newline="", encoding="utf-8") as f: | |
f.write("") # empty file | |
return | |
# Union of all keys to make a wide header | |
header = OrderedDict() | |
for r in records: | |
for k in r.keys(): | |
header[k] = True | |
fieldnames = list(header.keys()) | |
with open(path, "w", newline="", encoding="utf-8") as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") | |
writer.writeheader() | |
for r in records: | |
writer.writerow(r) | |
def validate_scrape(driver, records): | |
""" | |
Prints a validation report: | |
- DataTables expected total (from 'info') | |
- Count of visible rows in tbody | |
- Unique records scraped | |
- Unique PDFs | |
Returns a dict with metrics for programmatic checks if desired. | |
""" | |
expected_total, showing_to = get_datatables_info_counts(driver) | |
visible_rows = len(driver.find_elements(By.CSS_SELECTOR, "table.dataTable tbody tr[role='row']")) | |
scraped_total = len(records) | |
unique_pdfs = len({(r.get("PDF Link") or r.get("PDF link") or r.get("Link") or "").strip() for r in records}) | |
print("\n=== Validation Summary ===") | |
print(f"DataTables expected total: {expected_total if expected_total is not None else 'N/A'}") | |
print(f"Visible rows (tbody): {visible_rows}") | |
print(f"Scraped unique records: {scraped_total}") | |
print(f"Unique PDF links: {unique_pdfs}") | |
issues = [] | |
if expected_total is not None and scraped_total < expected_total: | |
issues.append(f"Scraped {scraped_total}, but DataTables reports {expected_total}.") | |
if expected_total is not None and visible_rows != expected_total: | |
issues.append(f"Visible rows ({visible_rows}) != DataTables expected total ({expected_total}).") | |
if issues: | |
print("Warnings:") | |
for msg in issues: | |
print(f" - {msg}") | |
else: | |
print("Counts look consistent.") | |
return { | |
"expected_total": expected_total, | |
"visible_rows": visible_rows, | |
"scraped_total": scraped_total, | |
"unique_pdfs": unique_pdfs, | |
"showing_to": showing_to, | |
"warnings": issues, | |
} | |
if __name__ == "__main__": | |
from selenium.webdriver.chrome.options import Options | |
opts = Options() | |
opts.add_argument("--headless=new") | |
opts.add_argument("--disable-gpu") | |
opts.add_argument("--window-size=1600,1200") | |
driver = webdriver.Chrome(options=opts) | |
try: | |
driver.get(URL) | |
wait(driver, 30).until(EC.presence_of_element_located((By.CSS_SELECTOR, "table.dataTable"))) | |
set_page_length_to_all(driver) # <<— show ALL entries | |
records = parse_all_rows_on_page(driver) | |
# Exports | |
export_json("2025_ai_plan_responses.json", records) | |
export_csv("2025_ai_plan_responses.csv", records) | |
# Validation pass (prints summary) | |
validate_scrape(driver, records) | |
print(f"\nDone. Wrote {len(records)} records to JSON and CSV.") | |
finally: | |
driver.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment