Created
April 19, 2026 20:43
-
-
Save bixb0012/95ec027e232897dda8a598520277f6a7 to your computer and use it in GitHub Desktop.
ArcPy: Generate FGDB Collection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| generate_fgdb_collection.py | |
| ================== | |
| Build a deterministic collection of synthetic file geodatabases for timing | |
| arcpy.da.Walk and comparing traversal strategies (sequential filtered | |
| walks vs. list-datatype walks vs. threaded walks). | |
| Each profile produces one .gdb directory plus a <name>.manifest.json sidecar | |
| listing the expected catalog items, so later timing runs can also double as | |
| correctness checks (set-equality against Walk output, not just counts). | |
| Requires: arcpy (ArcGIS Pro 3.x). Close ArcGIS Pro before running so | |
| shutil.rmtree can remove existing .gdb directories without lock conflicts. | |
| Usage | |
| ----- | |
| python generate_fgdb_collection.py --list-profiles | |
| python generate_fgdb_collection.py --output-dir ./collection | |
| python generate_fgdb_collection.py --output-dir ./collection --profiles tiny,flat_medium | |
| python generate_fgdb_collection.py --output-dir ./collection --overwrite | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import random | |
| import shutil | |
| import sys | |
| import time | |
| from dataclasses import asdict, dataclass, field | |
| from pathlib import Path | |
| from typing import Sequence | |
| import arcpy # type: ignore | |
| # --------------------------------------------------------------------------- | |
| # Profile definitions | |
| # --------------------------------------------------------------------------- | |
| GeometryWeights = dict[str, float] | |
| DEFAULT_GEOM_MIX: GeometryWeights = { | |
| "POINT": 0.30, | |
| "POLYLINE": 0.25, | |
| "POLYGON": 0.35, | |
| "MULTIPOINT": 0.05, | |
| "MULTIPATCH": 0.05, | |
| } | |
| @dataclass(frozen=True) | |
| class GdbProfile: | |
| """Description of a synthetic geodatabase to generate. | |
| Counts are simple ints (not ranges) so two runs of the same profile | |
| at the same seed produce byte-identical catalogs. For variation, | |
| create another profile rather than randomizing counts. | |
| """ | |
| name: str | |
| root_feature_classes: int = 0 | |
| root_tables: int = 0 | |
| feature_datasets: int = 0 | |
| fcs_per_dataset: int = 0 | |
| relationship_classes: int = 0 | |
| domains: int = 0 | |
| extra_fields_per_fc: int = 3 | |
| geometry_mix: GeometryWeights = field( | |
| default_factory=lambda: dict(DEFAULT_GEOM_MIX) | |
| ) | |
| seed: int = 0 | |
| description: str = "" | |
| PROFILE_CATALOG: dict[str, GdbProfile] = { | |
| "empty": GdbProfile( | |
| name="empty", | |
| seed=101, | |
| description="Empty gdb - zero of everything. Edge case.", | |
| ), | |
| "tiny": GdbProfile( | |
| name="tiny", | |
| root_feature_classes=2, | |
| root_tables=2, | |
| feature_datasets=1, | |
| fcs_per_dataset=2, | |
| relationship_classes=1, | |
| domains=1, | |
| seed=102, | |
| description="Minimal gdb with one of each primary datatype.", | |
| ), | |
| "flat_small": GdbProfile( | |
| name="flat_small", | |
| root_feature_classes=20, | |
| root_tables=10, | |
| domains=2, | |
| seed=103, | |
| description="20 FCs + 10 tables at root. Isolates root enumeration.", | |
| ), | |
| "flat_medium": GdbProfile( | |
| name="flat_medium", | |
| root_feature_classes=100, | |
| root_tables=50, | |
| relationship_classes=5, | |
| domains=3, | |
| seed=104, | |
| description="100 FCs + 50 tables at root. Matches forum baseline.", | |
| ), | |
| "nested_medium": GdbProfile( | |
| name="nested_medium", | |
| root_feature_classes=50, | |
| root_tables=50, | |
| feature_datasets=5, | |
| fcs_per_dataset=10, | |
| relationship_classes=5, | |
| domains=3, | |
| seed=105, | |
| description="Half the FCs inside 5 FDs. Tests FD recursion cost.", | |
| ), | |
| "wide_datasets": GdbProfile( | |
| name="wide_datasets", | |
| feature_datasets=20, | |
| fcs_per_dataset=5, | |
| seed=106, | |
| description="20 FDs * 5 FCs, no root content. Stresses FD dimension.", | |
| ), | |
| "deep_only": GdbProfile( | |
| name="deep_only", | |
| feature_datasets=1, | |
| fcs_per_dataset=40, | |
| seed=107, | |
| description="Single FD holding everything. Degenerate nesting shape.", | |
| ), | |
| "rc_heavy": GdbProfile( | |
| name="rc_heavy", | |
| root_feature_classes=20, | |
| root_tables=20, | |
| relationship_classes=50, | |
| seed=108, | |
| description="Moderate FCs/tables with 50 RCs. Stresses RC walks.", | |
| ), | |
| "xl": GdbProfile( | |
| name="xl", | |
| root_feature_classes=300, | |
| root_tables=200, | |
| feature_datasets=10, | |
| fcs_per_dataset=20, | |
| relationship_classes=30, | |
| domains=5, | |
| seed=110, | |
| description="Stress tier. Local SSD unless you're patient.", | |
| ), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| FIELD_TYPES: Sequence[str] = ("TEXT", "LONG", "SHORT", "DOUBLE", "FLOAT", "DATE") | |
| def _weighted_choice(rng: random.Random, weights: GeometryWeights) -> str: | |
| items = list(weights.items()) | |
| total = sum(w for _, w in items) | |
| r = rng.uniform(0.0, total) | |
| upto = 0.0 | |
| for name, w in items: | |
| upto += w | |
| if upto >= r: | |
| return name | |
| return items[-1][0] | |
| def _spatial_ref() -> "arcpy.SpatialReference": | |
| return arcpy.SpatialReference(4326) | |
| def _stamp_iso() -> str: | |
| return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| def _geom_suffix(geom: str) -> str: | |
| return { | |
| "POINT": "pt", | |
| "MULTIPOINT": "mpt", | |
| "POLYLINE": "ln", | |
| "POLYGON": "pg", | |
| "MULTIPATCH": "mp", | |
| }.get(geom, "fc") | |
| # --------------------------------------------------------------------------- | |
| # Per-object creation | |
| # --------------------------------------------------------------------------- | |
| def _add_standard_fields(table_path: str, rng: random.Random, | |
| extra_count: int) -> None: | |
| """Baseline schema + N deterministic-random extra fields.""" | |
| arcpy.management.AddField(table_path, "NAME", "TEXT", field_length=100) | |
| arcpy.management.AddField(table_path, "VALUE", "DOUBLE") | |
| arcpy.management.AddField(table_path, "CATEGORY", "SHORT") | |
| arcpy.management.AddField(table_path, "CREATED", "DATE") | |
| arcpy.management.AddField(table_path, "FOREIGN_KEY", "LONG") | |
| for i in range(extra_count): | |
| ftype = rng.choice(FIELD_TYPES) | |
| fname = f"EXTRA_{i:02d}" | |
| kwargs: dict = {} | |
| if ftype == "TEXT": | |
| kwargs["field_length"] = rng.choice([25, 50, 100, 255]) | |
| arcpy.management.AddField(table_path, fname, ftype, **kwargs) | |
| def _create_domains(gdb: str, count: int, rng: random.Random) -> list[str]: | |
| created: list[str] = [] | |
| for i in range(count): | |
| dname = f"DOM_{i:03d}" | |
| arcpy.management.CreateDomain( | |
| gdb, dname, f"Auto-generated domain {i}", "SHORT", "CODED" | |
| ) | |
| for code in range(rng.randint(3, 6)): | |
| arcpy.management.AddCodedValueToDomain( | |
| gdb, dname, code, f"Option {code}" | |
| ) | |
| created.append(dname) | |
| return created | |
| def _create_feature_class(out_path: str, name: str, geom: str, | |
| rng: random.Random, extra_fields: int) -> str: | |
| arcpy.management.CreateFeatureclass( | |
| out_path=out_path, | |
| out_name=name, | |
| geometry_type=geom, | |
| spatial_reference=_spatial_ref(), | |
| ) | |
| full = f"{out_path}/{name}" | |
| _add_standard_fields(full, rng, extra_fields) | |
| return full | |
| def _create_table(gdb: str, name: str, rng: random.Random, | |
| extra_fields: int) -> str: | |
| arcpy.management.CreateTable(gdb, name) | |
| full = f"{gdb}/{name}" | |
| _add_standard_fields(full, rng, extra_fields) | |
| return full | |
| def _create_feature_dataset(gdb: str, name: str) -> str: | |
| arcpy.management.CreateFeatureDataset(gdb, name, _spatial_ref()) | |
| return f"{gdb}/{name}" | |
| def _create_relationship_class(gdb: str, origin: str, dest: str, | |
| rc_name: str) -> str: | |
| arcpy.management.CreateRelationshipClass( | |
| origin_table=origin, | |
| destination_table=dest, | |
| out_relationship_class=f"{gdb}/{rc_name}", | |
| relationship_type="SIMPLE", | |
| forward_label=f"{rc_name}_fwd", | |
| backward_label=f"{rc_name}_back", | |
| message_direction="NONE", | |
| cardinality="ONE_TO_MANY", | |
| attributed="NONE", | |
| origin_primary_key="OBJECTID", | |
| origin_foreign_key="FOREIGN_KEY", | |
| ) | |
| return f"{gdb}/{rc_name}" | |
| # --------------------------------------------------------------------------- | |
| # Top-level generation | |
| # --------------------------------------------------------------------------- | |
| def generate_gdb(profile: GdbProfile, out_dir: Path, | |
| overwrite: bool = False) -> dict: | |
| """Generate one .gdb for the given profile and write its manifest.""" | |
| rng = random.Random(profile.seed) | |
| gdb_name = f"{profile.name}.gdb" | |
| gdb_path = out_dir / gdb_name | |
| manifest_path = out_dir / f"{profile.name}.manifest.json" | |
| if gdb_path.exists(): | |
| if not overwrite: | |
| raise FileExistsError( | |
| f"{gdb_path} exists. Re-run with --overwrite to replace." | |
| ) | |
| shutil.rmtree(gdb_path) | |
| if manifest_path.exists() and overwrite: | |
| manifest_path.unlink() | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| t_start = time.perf_counter() | |
| arcpy.management.CreateFileGDB(str(out_dir), gdb_name) | |
| gdb = str(gdb_path) | |
| expected: dict[str, list[str]] = { | |
| "FeatureClass": [], | |
| "Table": [], | |
| "FeatureDataset": [], | |
| "RelationshipClass": [], | |
| } | |
| # Domains first so downstream field additions can reference them later | |
| # if we extend the generator. Not part of Walk expected[]. | |
| domain_names = _create_domains(gdb, profile.domains, rng) | |
| # Root feature classes | |
| for i in range(profile.root_feature_classes): | |
| geom = _weighted_choice(rng, profile.geometry_mix) | |
| fc_name = f"fc_{_geom_suffix(geom)}_{i:04d}" | |
| _create_feature_class( | |
| gdb, fc_name, geom, rng, profile.extra_fields_per_fc, | |
| ) | |
| expected["FeatureClass"].append(fc_name) | |
| # Root tables | |
| for i in range(profile.root_tables): | |
| t_name = f"tbl_{i:04d}" | |
| _create_table(gdb, t_name, rng, profile.extra_fields_per_fc) | |
| expected["Table"].append(t_name) | |
| # Feature datasets and their feature classes | |
| for di in range(profile.feature_datasets): | |
| ds_name = f"ds_{di:03d}" | |
| _create_feature_dataset(gdb, ds_name) | |
| expected["FeatureDataset"].append(ds_name) | |
| for fi in range(profile.fcs_per_dataset): | |
| geom = _weighted_choice(rng, profile.geometry_mix) | |
| fc_name = f"ds{di:03d}_fc_{_geom_suffix(geom)}_{fi:04d}" | |
| _create_feature_class( | |
| f"{gdb}/{ds_name}", fc_name, geom, rng, | |
| profile.extra_fields_per_fc, | |
| ) | |
| expected["FeatureClass"].append(f"{ds_name}/{fc_name}") | |
| # Relationship classes - pick among root-level origins/destinations only | |
| # (cross-FD RCs add constraints that aren't worth modeling here) | |
| root_candidates = ( | |
| [n for n in expected["FeatureClass"] if "/" not in n] | |
| + expected["Table"] | |
| ) | |
| if profile.relationship_classes and len(root_candidates) >= 2: | |
| for i in range(profile.relationship_classes): | |
| origin, dest = rng.sample(root_candidates, 2) | |
| rc_name = f"rel_{i:04d}" | |
| _create_relationship_class( | |
| gdb, f"{gdb}/{origin}", f"{gdb}/{dest}", rc_name, | |
| ) | |
| expected["RelationshipClass"].append(rc_name) | |
| elif profile.relationship_classes and len(root_candidates) < 2: | |
| print(f" [warn] profile {profile.name!r} asks for " | |
| f"{profile.relationship_classes} RCs but has fewer than 2 " | |
| f"root FCs/tables; skipping RC creation.", file=sys.stderr) | |
| elapsed = time.perf_counter() - t_start | |
| manifest = { | |
| "profile": profile.name, | |
| "profile_details": asdict(profile), | |
| "seed": profile.seed, | |
| "gdb_path": gdb_name, | |
| "gdb_abspath": str(gdb_path.resolve()), | |
| "expected": expected, | |
| "counts": {k: len(v) for k, v in expected.items()}, | |
| "domains": domain_names, | |
| "generated_at": _stamp_iso(), | |
| "arcpy_version": arcpy.GetInstallInfo().get("Version", "unknown"), | |
| "generation_seconds": round(elapsed, 2), | |
| } | |
| manifest_path.write_text(json.dumps(manifest, indent=2)) | |
| return manifest | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def _parse_args(argv: Sequence[str]) -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description="Generate synthetic .gdb collection for da.Walk benchmarking.", | |
| ) | |
| p.add_argument( | |
| "--output-dir", type=Path, default=Path("./collection"), | |
| help="Directory for .gdb files and manifests (default: ./collection)", | |
| ) | |
| p.add_argument( | |
| "--profiles", type=str, default="all", | |
| help="Comma-separated profile names, or 'all' (default: all)", | |
| ) | |
| p.add_argument( | |
| "--list-profiles", action="store_true", | |
| help="Print available profiles and exit", | |
| ) | |
| p.add_argument( | |
| "--overwrite", action="store_true", | |
| help="Replace existing .gdb directories", | |
| ) | |
| return p.parse_args(argv) | |
| def _list_profiles() -> None: | |
| print("Available profiles:\n") | |
| for name, prof in PROFILE_CATALOG.items(): | |
| print( | |
| f" {name:<16s} " | |
| f"fcs={prof.root_feature_classes:>3d} " | |
| f"tbls={prof.root_tables:>3d} " | |
| f"fds={prof.feature_datasets:>2d}x{prof.fcs_per_dataset:<2d} " | |
| f"rcs={prof.relationship_classes:>2d}" | |
| ) | |
| print(f" {'':<16s} {prof.description}") | |
| print() | |
| def main(argv: Sequence[str] | None = None) -> int: | |
| args = _parse_args(argv if argv is not None else sys.argv[1:]) | |
| if args.list_profiles: | |
| _list_profiles() | |
| return 0 | |
| if args.profiles == "all": | |
| names = list(PROFILE_CATALOG.keys()) | |
| else: | |
| names = [n.strip() for n in args.profiles.split(",") if n.strip()] | |
| unknown = [n for n in names if n not in PROFILE_CATALOG] | |
| if unknown: | |
| print(f"Unknown profiles: {unknown}", file=sys.stderr) | |
| print(f"Available: {list(PROFILE_CATALOG)}", file=sys.stderr) | |
| return 2 | |
| args.output_dir.mkdir(parents=True, exist_ok=True) | |
| failed: list[str] = [] | |
| for name in names: | |
| prof = PROFILE_CATALOG[name] | |
| print(f"[{name}] generating...", flush=True) | |
| try: | |
| m = generate_gdb(prof, args.output_dir, overwrite=args.overwrite) | |
| c = m["counts"] | |
| print( | |
| f"[{name}] done in {m['generation_seconds']}s - " | |
| f"FCs={c['FeatureClass']} Tbls={c['Table']} " | |
| f"FDs={c['FeatureDataset']} RCs={c['RelationshipClass']}" | |
| ) | |
| except FileExistsError as e: | |
| print(f"[{name}] skipped: {e}", file=sys.stderr) | |
| except Exception as e: | |
| print(f"[{name}] FAILED: {e}", file=sys.stderr) | |
| failed.append(name) | |
| print(f"\nCollection written to: {args.output_dir.resolve()}") | |
| if failed: | |
| print(f"Failed profiles: {failed}", file=sys.stderr) | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment