Created
December 31, 2025 10:50
-
-
Save bennyistanto/485a190496840b910c41aafbb00bc128 to your computer and use it in GitHub Desktop.
This notebook converts India Open Buildings data from large compressed CSV files (.csv.gz) into spatial formats such as GeoPackage or GeoJSON.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "id": "d2019e83-d3a3-4ccd-bdb7-4a2c72b2b2f7", | |
| "metadata": {}, | |
| "source": [ | |
| "### India Open Buildings Conversion Script\n", | |
| "\n", | |
| "This notebook converts India Open Buildings data — available at [https://gobs.aeee.in/downloads](https://gobs.aeee.in/downloads) — from large compressed CSV files (`.csv.gz`) into spatial formats such as GeoPackage (`.gpkg`) or GeoJSON. \n", | |
| "Each file contains building footprint polygons in Well-Known Text (WKT) format, along with attributes like area, height, floors, and land use. \n", | |
| "The script reads each CSV in memory-efficient chunks, repairs invalid geometries, and writes them incrementally to per-file GeoPackages suitable for GIS analysis.\n", | |
| "\n", | |
| "**Contact:**</br>\n", | |
| "Benny Istanto, GOST/DEC Data Group/The World Bank, [email protected]" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "id": "a62fcf9b-f5f4-4a00-9b07-df72a6d05cc8", | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "[19:40:49] INFO - PROCESS ANDAMAN___NICOBAR.gpkg [no_existing_output]\n", | |
| "[19:40:49] INFO - Processing: ANDAMAN___NICOBAR.csv.gz -> ANDAMAN___NICOBAR.gpkg\n", | |
| "[19:41:07] WARNING - Could not create spatial index for 'buildings' in ANDAMAN___NICOBAR.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[19:41:07] INFO - Done: rows_in=65,864, features_out=65,864, skipped=0\n", | |
| "[19:41:54] INFO - PROCESS ANDHRA_PRADESH.gpkg [auto_process_default]\n", | |
| "[19:41:54] INFO - Processing: ANDHRA_PRADESH.csv.gz -> ANDHRA_PRADESH.gpkg\n", | |
| "[20:22:46] WARNING - Could not create spatial index for 'buildings' in ANDHRA_PRADESH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[20:22:46] INFO - Done: rows_in=6,691,174, features_out=6,691,174, skipped=0\n", | |
| "[20:22:46] INFO - PROCESS ARUNACHAL_PRADESH.gpkg [no_existing_output]\n", | |
| "[20:22:46] INFO - Processing: ARUNACHAL_PRADESH.csv.gz -> ARUNACHAL_PRADESH.gpkg\n", | |
| "[20:23:39] WARNING - Could not create spatial index for 'buildings' in ARUNACHAL_PRADESH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[20:23:39] INFO - Done: rows_in=264,846, features_out=264,846, skipped=0\n", | |
| "[20:23:39] INFO - PROCESS ASSAM.gpkg [no_existing_output]\n", | |
| "[20:23:39] INFO - Processing: ASSAM.csv.gz -> ASSAM.gpkg\n", | |
| "[20:44:34] WARNING - Could not create spatial index for 'buildings' in ASSAM.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[20:44:34] INFO - Done: rows_in=5,002,139, features_out=5,002,139, skipped=0\n", | |
| "[20:44:34] INFO - PROCESS BIHAR.gpkg [no_existing_output]\n", | |
| "[20:44:34] INFO - Processing: BIHAR.csv.gz -> BIHAR.gpkg\n", | |
| "[21:54:15] WARNING - Could not create spatial index for 'buildings' in BIHAR.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[21:54:15] INFO - Done: rows_in=12,659,292, features_out=12,659,292, skipped=0\n", | |
| "[21:54:16] INFO - PROCESS CHANDIGARH.gpkg [no_existing_output]\n", | |
| "[21:54:16] INFO - Processing: CHANDIGARH.csv.gz -> CHANDIGARH.gpkg\n", | |
| "[21:54:35] WARNING - Could not create spatial index for 'buildings' in CHANDIGARH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[21:54:35] INFO - Done: rows_in=70,467, features_out=70,467, skipped=0\n", | |
| "[21:54:35] INFO - PROCESS CHHATTISGARH.gpkg [no_existing_output]\n", | |
| "[21:54:35] INFO - Processing: CHHATTISGARH.csv.gz -> CHHATTISGARH.gpkg\n", | |
| "[22:14:46] WARNING - Could not create spatial index for 'buildings' in CHHATTISGARH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[22:14:46] INFO - Done: rows_in=4,338,207, features_out=4,338,207, skipped=0\n", | |
| "[22:14:46] INFO - PROCESS DADRA___NAGAR_HAVELI.gpkg [no_existing_output]\n", | |
| "[22:14:46] INFO - Processing: DADRA___NAGAR_HAVELI.csv.gz -> DADRA___NAGAR_HAVELI.gpkg\n", | |
| "[22:14:57] WARNING - Could not create spatial index for 'buildings' in DADRA___NAGAR_HAVELI.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[22:14:57] INFO - Done: rows_in=44,266, features_out=44,266, skipped=0\n", | |
| "[22:14:57] INFO - PROCESS DAMAN___DIU.gpkg [no_existing_output]\n", | |
| "[22:14:57] INFO - Processing: DAMAN___DIU.csv.gz -> DAMAN___DIU.gpkg\n", | |
| "[22:15:04] WARNING - Could not create spatial index for 'buildings' in DAMAN___DIU.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[22:15:04] INFO - Done: rows_in=27,573, features_out=27,573, skipped=0\n", | |
| "[22:15:04] INFO - PROCESS DELHI.gpkg [no_existing_output]\n", | |
| "[22:15:04] INFO - Processing: DELHI.csv.gz -> DELHI.gpkg\n", | |
| "[22:18:56] WARNING - Could not create spatial index for 'buildings' in DELHI.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[22:18:56] INFO - Done: rows_in=1,090,195, features_out=1,090,195, skipped=0\n", | |
| "[22:18:56] INFO - PROCESS GOA.gpkg [no_existing_output]\n", | |
| "[22:18:56] INFO - Processing: GOA.csv.gz -> GOA.gpkg\n", | |
| "[22:19:51] WARNING - Could not create spatial index for 'buildings' in GOA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[22:19:51] INFO - Done: rows_in=274,485, features_out=274,485, skipped=0\n", | |
| "[22:19:52] INFO - PROCESS GUJARAT.gpkg [no_existing_output]\n", | |
| "[22:19:52] INFO - Processing: GUJARAT.csv.gz -> GUJARAT.gpkg\n", | |
| "[23:22:15] WARNING - Could not create spatial index for 'buildings' in GUJARAT.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[23:22:15] INFO - Done: rows_in=10,459,549, features_out=10,459,549, skipped=0\n", | |
| "[23:22:15] INFO - PROCESS HARYANA.gpkg [no_existing_output]\n", | |
| "[23:22:15] INFO - Processing: HARYANA.csv.gz -> HARYANA.gpkg\n", | |
| "[23:46:45] WARNING - Could not create spatial index for 'buildings' in HARYANA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[23:46:45] INFO - Done: rows_in=5,151,376, features_out=5,151,376, skipped=0\n", | |
| "[23:46:45] INFO - PROCESS HIMACHAL_PRADESH.gpkg [no_existing_output]\n", | |
| "[23:46:45] INFO - Processing: HIMACHAL_PRADESH.csv.gz -> HIMACHAL_PRADESH.gpkg\n", | |
| "[23:54:26] WARNING - Could not create spatial index for 'buildings' in HIMACHAL_PRADESH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[23:54:26] INFO - Done: rows_in=1,792,695, features_out=1,792,695, skipped=0\n", | |
| "[23:54:26] INFO - PROCESS JAMMU___KASHMIR.gpkg [no_existing_output]\n", | |
| "[23:54:26] INFO - Processing: JAMMU___KASHMIR.csv.gz -> JAMMU___KASHMIR.gpkg\n", | |
| "[00:10:42] WARNING - Could not create spatial index for 'buildings' in JAMMU___KASHMIR.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[00:10:42] INFO - Done: rows_in=3,451,582, features_out=3,451,582, skipped=0\n", | |
| "[00:10:42] INFO - PROCESS JHARKHAND.gpkg [no_existing_output]\n", | |
| "[00:10:42] INFO - Processing: JHARKHAND.csv.gz -> JHARKHAND.gpkg\n", | |
| "[00:35:09] WARNING - Could not create spatial index for 'buildings' in JHARKHAND.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[00:35:09] INFO - Done: rows_in=4,964,965, features_out=4,964,965, skipped=0\n", | |
| "[00:35:09] INFO - PROCESS KARNATAKA.gpkg [no_existing_output]\n", | |
| "[00:35:09] INFO - Processing: KARNATAKA.csv.gz -> KARNATAKA.gpkg\n", | |
| "[01:40:52] WARNING - Could not create spatial index for 'buildings' in KARNATAKA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[01:40:52] INFO - Done: rows_in=10,120,194, features_out=10,120,194, skipped=0\n", | |
| "[01:40:52] INFO - PROCESS KERALA.gpkg [no_existing_output]\n", | |
| "[01:40:52] INFO - Processing: KERALA.csv.gz -> KERALA.gpkg\n", | |
| "[02:48:11] WARNING - Could not create spatial index for 'buildings' in KERALA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[02:48:11] INFO - Done: rows_in=7,104,382, features_out=7,104,382, skipped=0\n", | |
| "[02:48:11] INFO - PROCESS LADAKH.gpkg [no_existing_output]\n", | |
| "[02:48:11] INFO - Processing: LADAKH.csv.gz -> LADAKH.gpkg\n", | |
| "[02:49:08] WARNING - Could not create spatial index for 'buildings' in LADAKH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[02:49:08] INFO - Done: rows_in=259,623, features_out=259,623, skipped=0\n", | |
| "[02:49:08] INFO - PROCESS LAKSHADWEEP.gpkg [no_existing_output]\n", | |
| "[02:49:08] INFO - Processing: LAKSHADWEEP.csv.gz -> LAKSHADWEEP.gpkg\n", | |
| "[02:49:09] WARNING - Could not create spatial index for 'buildings' in LAKSHADWEEP.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[02:49:09] INFO - Done: rows_in=3,027, features_out=3,027, skipped=0\n", | |
| "[02:49:09] INFO - PROCESS MADHYA_PRADESH.gpkg [no_existing_output]\n", | |
| "[02:49:09] INFO - Processing: MADHYA_PRADESH.csv.gz -> MADHYA_PRADESH.gpkg\n", | |
| "[03:41:58] WARNING - Could not create spatial index for 'buildings' in MADHYA_PRADESH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[03:41:58] INFO - Done: rows_in=11,146,663, features_out=11,146,663, skipped=0\n", | |
| "[03:41:58] INFO - PROCESS MAHARASHTRA.gpkg [no_existing_output]\n", | |
| "[03:41:58] INFO - Processing: MAHARASHTRA.csv.gz -> MAHARASHTRA.gpkg\n", | |
| "[05:41:38] WARNING - Could not create spatial index for 'buildings' in MAHARASHTRA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[05:41:38] INFO - Done: rows_in=15,682,841, features_out=15,682,841, skipped=0\n", | |
| "[05:41:38] INFO - PROCESS MANIPUR.gpkg [no_existing_output]\n", | |
| "[05:41:38] INFO - Processing: MANIPUR.csv.gz -> MANIPUR.gpkg\n", | |
| "[05:44:12] WARNING - Could not create spatial index for 'buildings' in MANIPUR.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[05:44:12] INFO - Done: rows_in=726,241, features_out=726,241, skipped=0\n", | |
| "[05:44:13] INFO - PROCESS MEGHALAYA.gpkg [no_existing_output]\n", | |
| "[05:44:13] INFO - Processing: MEGHALAYA.csv.gz -> MEGHALAYA.gpkg\n", | |
| "[05:45:39] WARNING - Could not create spatial index for 'buildings' in MEGHALAYA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[05:45:39] INFO - Done: rows_in=442,447, features_out=442,447, skipped=0\n", | |
| "[05:45:39] INFO - PROCESS MIZORAM.gpkg [no_existing_output]\n", | |
| "[05:45:39] INFO - Processing: MIZORAM.csv.gz -> MIZORAM.gpkg\n", | |
| "[05:46:10] WARNING - Could not create spatial index for 'buildings' in MIZORAM.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[05:46:10] INFO - Done: rows_in=159,545, features_out=159,545, skipped=0\n", | |
| "[05:46:10] INFO - PROCESS NAGALAND.gpkg [no_existing_output]\n", | |
| "[05:46:10] INFO - Processing: NAGALAND.csv.gz -> NAGALAND.gpkg\n", | |
| "[05:47:18] WARNING - Could not create spatial index for 'buildings' in NAGALAND.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[05:47:18] INFO - Done: rows_in=348,473, features_out=348,473, skipped=0\n", | |
| "[05:47:18] INFO - PROCESS ODISHA.gpkg [no_existing_output]\n", | |
| "[05:47:18] INFO - Processing: ODISHA.csv.gz -> ODISHA.gpkg\n", | |
| "[05:59:49] ERROR - Failed to process /mnt/c/Users/benny/OneDrive/Personal/wbg/data/ind/gobs-csv/ODISHA.csv.gz: Failed to commit transaction\n", | |
| "Traceback (most recent call last):\n", | |
| " File \"/tmp/ipykernel_24959/1708217288.py\", line 588, in run_batch\n", | |
| " stats = write_csv_to_vector(\n", | |
| " input_csv=in_csv,\n", | |
| " ...<7 lines>...\n", | |
| " selected_columns=selected_columns,\n", | |
| " )\n", | |
| " File \"/tmp/ipykernel_24959/1708217288.py\", line 481, in write_csv_to_vector\n", | |
| " process_df(df)\n", | |
| " ~~~~~~~~~~^^^^\n", | |
| " File \"/tmp/ipykernel_24959/1708217288.py\", line 475, in process_df\n", | |
| " sink.writerecords(features)\n", | |
| " ~~~~~~~~~~~~~~~~~^^^^^^^^^^\n", | |
| " File \"/home/bennyistanto/miniforge3/envs/climate/lib/python3.13/site-packages/fiona/collection.py\", line 541, in writerecords\n", | |
| " self.session.writerecs(records, self)\n", | |
| " ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^\n", | |
| " File \"fiona/ogrext.pyx\", line 1685, in fiona.ogrext.WritingSession.writerecs\n", | |
| "fiona.errors.TransactionError: Failed to commit transaction\n", | |
| "[05:59:50] INFO - PROCESS PUDUCHERRY.gpkg [no_existing_output]\n", | |
| "[05:59:50] INFO - Processing: PUDUCHERRY.csv.gz -> PUDUCHERRY.gpkg\n", | |
| "[06:00:29] WARNING - Could not create spatial index for 'buildings' in PUDUCHERRY.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[06:00:29] INFO - Done: rows_in=203,021, features_out=203,021, skipped=0\n", | |
| "[06:00:29] INFO - PROCESS PUNJAB.gpkg [no_existing_output]\n", | |
| "[06:00:29] INFO - Processing: PUNJAB.csv.gz -> PUNJAB.gpkg\n", | |
| "[06:38:21] WARNING - Could not create spatial index for 'buildings' in PUNJAB.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[06:38:21] INFO - Done: rows_in=6,418,114, features_out=6,418,114, skipped=0\n", | |
| "[06:38:21] INFO - PROCESS RAJASTHAN.gpkg [no_existing_output]\n", | |
| "[06:38:21] INFO - Processing: RAJASTHAN.csv.gz -> RAJASTHAN.gpkg\n", | |
| "[08:22:14] WARNING - Could not create spatial index for 'buildings' in RAJASTHAN.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[08:22:14] INFO - Done: rows_in=14,186,486, features_out=14,186,486, skipped=0\n", | |
| "[08:22:14] INFO - PROCESS SIKKIM.gpkg [no_existing_output]\n", | |
| "[08:22:14] INFO - Processing: SIKKIM.csv.gz -> SIKKIM.gpkg\n", | |
| "[08:22:36] WARNING - Could not create spatial index for 'buildings' in SIKKIM.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[08:22:36] INFO - Done: rows_in=110,161, features_out=110,161, skipped=0\n", | |
| "[08:22:36] INFO - PROCESS TAMIL_NADU.gpkg [no_existing_output]\n", | |
| "[08:22:36] INFO - Processing: TAMIL_NADU.csv.gz -> TAMIL_NADU.gpkg\n", | |
| "[10:03:27] WARNING - Could not create spatial index for 'buildings' in TAMIL_NADU.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[10:03:27] INFO - Done: rows_in=13,558,688, features_out=13,558,688, skipped=0\n", | |
| "[10:03:27] INFO - PROCESS TELANGANA.gpkg [no_existing_output]\n", | |
| "[10:03:27] INFO - Processing: TELANGANA.csv.gz -> TELANGANA.gpkg\n", | |
| "[10:35:27] WARNING - Could not create spatial index for 'buildings' in TELANGANA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[10:35:27] INFO - Done: rows_in=5,658,553, features_out=5,658,553, skipped=0\n", | |
| "[10:35:27] INFO - PROCESS TRIPURA.gpkg [no_existing_output]\n", | |
| "[10:35:27] INFO - Processing: TRIPURA.csv.gz -> TRIPURA.gpkg\n", | |
| "[10:37:40] WARNING - Could not create spatial index for 'buildings' in TRIPURA.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[10:37:40] INFO - Done: rows_in=678,346, features_out=678,346, skipped=0\n", | |
| "[10:37:40] INFO - PROCESS UTTARAKHAND.gpkg [no_existing_output]\n", | |
| "[10:37:40] INFO - Processing: UTTARAKHAND.csv.gz -> UTTARAKHAND.gpkg\n", | |
| "[10:44:58] WARNING - Could not create spatial index for 'buildings' in UTTARAKHAND.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[10:44:58] INFO - Done: rows_in=1,920,135, features_out=1,920,135, skipped=0\n", | |
| "[10:44:59] INFO - PROCESS UTTAR_PRADESH.gpkg [no_existing_output]\n", | |
| "[10:44:59] INFO - Processing: UTTAR_PRADESH.csv.gz -> UTTAR_PRADESH.gpkg\n", | |
| "[13:52:08] WARNING - Could not create spatial index for 'buildings' in UTTAR_PRADESH.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[13:52:08] INFO - Done: rows_in=28,076,052, features_out=28,076,052, skipped=0\n", | |
| "[13:52:09] INFO - PROCESS WEST_BENGAL.gpkg [no_existing_output]\n", | |
| "[13:52:09] INFO - Processing: WEST_BENGAL.csv.gz -> WEST_BENGAL.gpkg\n", | |
| "[15:22:32] WARNING - Could not create spatial index for 'buildings' in WEST_BENGAL.gpkg (GDAL Python/ogrinfo unavailable?).\n", | |
| "[15:22:32] INFO - Done: rows_in=11,051,484, features_out=11,051,484, skipped=0\n", | |
| "[15:22:32] INFO - ALL DONE. rows_in=184,203,151, features_out=184,203,151, skipped=0\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "# !pip -q install pandas fiona shapely pyproj # ← uncomment if needed\n", | |
| "# If GDAL Python bindings are unavailable, we’ll fall back to ogrinfo CLI for CreateSpatialIndex.\n", | |
| "# We also detect pre-existing R*Tree via sqlite3 to avoid false warnings.\n", | |
| "\n", | |
| "# =============================================================================\n", | |
| "# csv_buildings_to_geopkg (Notebook Single-Cell: fast + robust + index detection & creation)\n", | |
| "# =============================================================================\n", | |
| "\"\"\"\n", | |
| "Stream-convert India Open Buildings CSV/.csv.gz files into per-file GeoPackages\n", | |
| "(or GeoJSON), parsing WKT polygons from the 'geometry' column.\n", | |
| "\n", | |
| "Why this notebook cell?\n", | |
| "- Many India Open Buildings CSVs are huge (GB-scale). This code reads in chunks\n", | |
| " and writes features incrementally so memory stays bounded.\n", | |
| "- Uses Fiona directly for streaming writes (no need to hold an entire GeoDataFrame).\n", | |
| "- Repairs invalid polygons (self-intersections, rings) only when needed.\n", | |
| "- Logs progress, errors, and per-file statistics.\n", | |
| "\n", | |
| "Speed features:\n", | |
| "- Vectorized WKT parsing via shapely.from_wkt (Shapely ≥2.0).\n", | |
| "- Batch writes with sink.writerecords(...) to minimize Python↔GDAL overhead.\n", | |
| "- Disables spatial index during write for speed & to avoid rtree UNIQUE errors,\n", | |
| " then creates the spatial index immediately afterwards (in this process), unless it already exists.\n", | |
| "\n", | |
| "Robustness:\n", | |
| "- Existing-output policy: \"auto\" | \"replace\" | \"skip\" | \"abort\".\n", | |
| "- Schema declared as MultiPolygon and Polygons are promoted to MultiPolygon to avoid type errors.\n", | |
| "- Spatial index builder first checks if the R*Tree already exists (sqlite3), then tries GDAL Python or ogrinfo.\n", | |
| "\n", | |
| "Requirements:\n", | |
| " - Python 3.9+\n", | |
| " - pandas\n", | |
| " - fiona\n", | |
| " - shapely >= 2.0 preferred (for from_wkt & make_valid). If <2.0, fallbacks are used.\n", | |
| "\"\"\"\n", | |
| "\n", | |
| "from __future__ import annotations\n", | |
| "\n", | |
| "import os\n", | |
| "import sys\n", | |
| "import logging\n", | |
| "import shutil\n", | |
| "import subprocess\n", | |
| "import sqlite3\n", | |
| "from glob import glob\n", | |
| "from typing import Dict, Iterable, List, Optional, Tuple\n", | |
| "\n", | |
| "import pandas as pd\n", | |
| "import fiona\n", | |
| "from fiona.crs import CRS\n", | |
| "from shapely import wkt as shapely_wkt\n", | |
| "from shapely.geometry import mapping, MultiPolygon # ← MultiPolygon for schema + casting\n", | |
| "\n", | |
| "# Prefer Shapely ≥2.0 fast-paths: from_wkt + make_valid\n", | |
| "try:\n", | |
| " from shapely import from_wkt as shapely_from_wkt\n", | |
| " SHAPELY_HAS_FROM_WKT = True\n", | |
| "except Exception:\n", | |
| " SHAPELY_HAS_FROM_WKT = False\n", | |
| "\n", | |
| "try:\n", | |
| " from shapely.validation import make_valid as shapely_make_valid\n", | |
| " SHAPELY_HAS_MAKE_VALID = True\n", | |
| "except Exception:\n", | |
| " SHAPELY_HAS_MAKE_VALID = False\n", | |
| "\n", | |
| "# Try GDAL Python bindings for in-process spatial index creation\n", | |
| "try:\n", | |
| " from osgeo import ogr # type: ignore\n", | |
| " HAS_GDAL_PY = True\n", | |
| "except Exception:\n", | |
| " HAS_GDAL_PY = False\n", | |
| "\n", | |
| "\n", | |
| "# ============================== USER PARAMETERS ============================== #\n", | |
| "INPUT_DIR = \"/mnt/c/Users/benny/OneDrive/Personal/wbg/data/ind/gobs-csv\"\n", | |
| "OUTPUT_DIR = \"/mnt/c/Users/benny/OneDrive/Personal/wbg/data/ind/gobs-gpkg\"\n", | |
| "PATTERN = \"*.csv.gz\" # or \"*.csv\"\n", | |
| "OUTPUT_FORMAT = \"gpkg\" # \"gpkg\" or \"geojson\"\n", | |
| "GPKG_LAYER = \"buildings\"\n", | |
| "GEOM_NAME = \"geom\" # GeoPackage geometry column name (fixed for deterministic SQL)\n", | |
| "GEOM_SCHEMA_TYPE = \"MultiPolygon\" # Layer schema geometry; Polygon rows will be promoted to MultiPolygon\n", | |
| "\n", | |
| "CHUNK_SIZE = 250_000 # speed vs RAM (try 200k–500k)\n", | |
| "WRITE_BATCH_SIZE = 25_000 # batch size for sink.writerecords; adaptive backoff on failure\n", | |
| "CRS_EPSG = 4326 # WGS84\n", | |
| "GEOM_COL = \"geometry\" # CSV WKT column\n", | |
| "SELECTED_COLUMNS = None # e.g. [\"latitude\",\"longitude\",\"area_in_meters\",\"landuse\",\"state_name\",\"district_name\",\"geometry\"]\n", | |
| "\n", | |
| "# During write, disable spatial index for speed & to avoid rtree UNIQUE errors; build it right after\n", | |
| "WRITE_WITH_SPATIAL_INDEX = False\n", | |
| "BUILD_SPATIAL_INDEX_AFTER = True\n", | |
| "\n", | |
| "# Existing-output behavior: \"auto\" | \"replace\" | \"skip\" | \"abort\"\n", | |
| "EXISTING_BEHAVIOR = \"skip\"\n", | |
| "ALLOW_SKIPS = 1000 # tolerance for 'auto' completeness check\n", | |
| "\n", | |
| "# Geometry repair\n", | |
| "REPAIR_INVALID = True\n", | |
| "\n", | |
| "# Logging\n", | |
| "VERBOSITY = 1 # 0=WARNING, 1=INFO, 2=DEBUG\n", | |
| "FID64 = True # safer on huge datasets\n", | |
| "\n", | |
| "\n", | |
| "# ============================== Logging setup ================================ #\n", | |
| "def setup_logger(verbosity: int = 1) -> logging.Logger:\n", | |
| " \"\"\"\n", | |
| " Configure and return a module-level logger.\n", | |
| "\n", | |
| " Parameters\n", | |
| " ----------\n", | |
| " verbosity : int\n", | |
| " 0 = WARNING, 1 = INFO, 2+ = DEBUG\n", | |
| "\n", | |
| " Returns\n", | |
| " -------\n", | |
| " logging.Logger\n", | |
| " \"\"\"\n", | |
| " logger = logging.getLogger(\"csv2gpkg\")\n", | |
| " if logger.handlers:\n", | |
| " for h in list(logger.handlers):\n", | |
| " logger.removeHandler(h)\n", | |
| " level = logging.WARNING if verbosity <= 0 else logging.INFO if verbosity == 1 else logging.DEBUG\n", | |
| " logger.setLevel(level)\n", | |
| " ch = logging.StreamHandler(stream=sys.stderr)\n", | |
| " ch.setLevel(level)\n", | |
| " fmt = logging.Formatter(\"[%(asctime)s] %(levelname)s - %(message)s\", \"%H:%M:%S\")\n", | |
| " ch.setFormatter(fmt)\n", | |
| " logger.addHandler(ch)\n", | |
| " return logger\n", | |
| "\n", | |
| "LOGGER = setup_logger(VERBOSITY)\n", | |
| "\n", | |
| "\n", | |
| "# ============================ Utility functions ============================== #\n", | |
| "def infer_fiona_type(pd_dtype: str) -> str:\n", | |
| " \"\"\"\n", | |
| " Map a pandas dtype string to a Fiona schema field type.\n", | |
| "\n", | |
| " Parameters\n", | |
| " ----------\n", | |
| " pd_dtype : str\n", | |
| " pandas dtype string (e.g. 'int64', 'float64', 'object', 'bool')\n", | |
| "\n", | |
| " Returns\n", | |
| " -------\n", | |
| " str\n", | |
| " Fiona type string (e.g. 'int', 'float', 'str', 'bool').\n", | |
| " \"\"\"\n", | |
| " dt = str(pd_dtype)\n", | |
| " if \"int\" in dt:\n", | |
| " return \"int\"\n", | |
| " if \"float\" in dt:\n", | |
| " return \"float\"\n", | |
| " if \"bool\" in dt:\n", | |
| " return \"bool\"\n", | |
| " return \"str\" # object/category/datetime -> store as string\n", | |
| "\n", | |
| "\n", | |
| "def build_schema_from_df(df: pd.DataFrame, geometry_type: str = GEOM_SCHEMA_TYPE) -> Dict:\n", | |
| " \"\"\"\n", | |
| " Build a Fiona schema from a DataFrame, excluding the 'geometry' column.\n", | |
| "\n", | |
| " Parameters\n", | |
| " ----------\n", | |
| " df : pd.DataFrame\n", | |
| " First chunk to infer attribute types.\n", | |
| " geometry_type : str\n", | |
| " Geometry type for the layer schema (\"MultiPolygon\" by default).\n", | |
| "\n", | |
| " Returns\n", | |
| " -------\n", | |
| " dict\n", | |
| " Fiona schema dict with declared geometry type and attribute field types.\n", | |
| " \"\"\"\n", | |
| " properties = {}\n", | |
| " for col, dtype in df.dtypes.items():\n", | |
| " if col == \"geometry\":\n", | |
| " continue\n", | |
| " properties[col] = infer_fiona_type(str(dtype))\n", | |
| " return {\"geometry\": geometry_type, \"properties\": properties}\n", | |
| "\n", | |
| "\n", | |
| "def _fast_parse_wkts(series: pd.Series):\n", | |
| " \"\"\"\n", | |
| " Vectorized WKT -> shapely geometry if available (Shapely ≥2.0),\n", | |
| " else per-row fallback using shapely.wkt.loads.\n", | |
| " \"\"\"\n", | |
| " if SHAPELY_HAS_FROM_WKT:\n", | |
| " return shapely_from_wkt(series.to_numpy(copy=False))\n", | |
| " return series.map(shapely_wkt.loads)\n", | |
| "\n", | |
| "\n", | |
| "def repair_geometry(geom):\n", | |
| " \"\"\"\n", | |
| " Attempt to repair invalid geometries:\n", | |
| " - Prefer Shapely make_valid (>=2.0).\n", | |
| " - Fallback to buffer(0) for minor self-intersections.\n", | |
| " Returns a valid geometry or None.\n", | |
| " \"\"\"\n", | |
| " if geom is None:\n", | |
| " return None\n", | |
| " if geom.is_valid:\n", | |
| " return geom\n", | |
| " try:\n", | |
| " if REPAIR_INVALID and SHAPELY_HAS_MAKE_VALID:\n", | |
| " fixed = shapely_make_valid(geom)\n", | |
| " else:\n", | |
| " fixed = geom.buffer(0) if REPAIR_INVALID else geom\n", | |
| " if fixed is not None and not fixed.is_empty:\n", | |
| " return fixed\n", | |
| " except Exception:\n", | |
| " pass\n", | |
| " return None\n", | |
| "\n", | |
| "\n", | |
| "def _as_multipolygon(geom):\n", | |
| " \"\"\"\n", | |
| " Ensure the geometry is a MultiPolygon to match the layer schema.\n", | |
| " Returns None if geometry is missing/empty or of an unexpected type.\n", | |
| " \"\"\"\n", | |
| " if geom is None or getattr(geom, \"is_empty\", True):\n", | |
| " return None\n", | |
| " gt = geom.geom_type\n", | |
| " if gt == \"MultiPolygon\":\n", | |
| " return geom\n", | |
| " if gt == \"Polygon\":\n", | |
| " return MultiPolygon([geom])\n", | |
| " # Unexpected geometry types (Point/LineString/etc.) are skipped\n", | |
| " return None\n", | |
| "\n", | |
| "\n", | |
| "def iter_csv_chunks(path: str, chunk_size: int, usecols: Optional[List[str]] = None) -> Iterable[pd.DataFrame]:\n", | |
| " \"\"\"\n", | |
| " Yield chunks of a CSV/CSV.GZ using pandas to keep memory bounded.\n", | |
| " \"\"\"\n", | |
| " return pd.read_csv(\n", | |
| " path,\n", | |
| " compression=\"infer\",\n", | |
| " chunksize=chunk_size,\n", | |
| " usecols=usecols,\n", | |
| " low_memory=True,\n", | |
| " )\n", | |
| "\n", | |
| "\n", | |
| "def gpkg_feature_count(path: str, layer: str) -> Optional[int]:\n", | |
| " \"\"\"\n", | |
| " Return feature count of a GPKG layer, or None if not readable.\n", | |
| " \"\"\"\n", | |
| " try:\n", | |
| " with fiona.open(path, layer=layer) as src:\n", | |
| " return len(src)\n", | |
| " except Exception:\n", | |
| " return None\n", | |
| "\n", | |
| "\n", | |
| "def csv_row_count_quick(path: str, geom_col: str, chunk_size: int = 500_000) -> int:\n", | |
| " \"\"\"\n", | |
| " Quick-ish row count via pandas (reads only one column; supports gzip).\n", | |
| " \"\"\"\n", | |
| " total = 0\n", | |
| " for df in pd.read_csv(path, compression=\"infer\", chunksize=chunk_size, usecols=[geom_col], low_memory=True):\n", | |
| " total += len(df)\n", | |
| " return total\n", | |
| "\n", | |
| "\n", | |
| "def open_fiona_sink(\n", | |
| " output_path: str,\n", | |
| " schema: Dict,\n", | |
| " crs_epsg: int,\n", | |
| " driver: str,\n", | |
| " layer: str = \"buildings\",\n", | |
| " overwrite: bool = False\n", | |
| "):\n", | |
| " \"\"\"\n", | |
| " Open a Fiona collection for writing (GPKG or GeoJSON).\n", | |
| "\n", | |
| " - For GPKG, fix geometry column name (GEOMETRY_NAME=geom) to make CreateSpatialIndex SQL deterministic.\n", | |
| " - Optionally disable spatial index during write for speed & to avoid rtree UNIQUE errors; we'll build it after.\n", | |
| " \"\"\"\n", | |
| " if overwrite and os.path.exists(output_path):\n", | |
| " os.remove(output_path)\n", | |
| "\n", | |
| " crs = CRS.from_epsg(crs_epsg)\n", | |
| "\n", | |
| " if driver.upper() == \"GPKG\":\n", | |
| " layer_options = [f\"GEOMETRY_NAME={GEOM_NAME}\"]\n", | |
| " if not WRITE_WITH_SPATIAL_INDEX:\n", | |
| " layer_options.append(\"SPATIAL_INDEX=NO\")\n", | |
| " if FID64:\n", | |
| " layer_options.append(\"FID64=YES\")\n", | |
| " return fiona.open(\n", | |
| " output_path,\n", | |
| " mode=\"w\",\n", | |
| " driver=\"GPKG\",\n", | |
| " schema=schema,\n", | |
| " crs=crs,\n", | |
| " layer=layer,\n", | |
| " layer_options=layer_options\n", | |
| " )\n", | |
| " elif driver.upper() == \"GEOJSON\":\n", | |
| " return fiona.open(\n", | |
| " output_path,\n", | |
| " mode=\"w\",\n", | |
| " driver=\"GeoJSON\",\n", | |
| " schema=schema,\n", | |
| " crs=crs,\n", | |
| " )\n", | |
| " else:\n", | |
| " raise ValueError(f\"Unsupported driver: {driver}\")\n", | |
| "\n", | |
| "\n", | |
| "def _props_records(df: pd.DataFrame, geom_field: str) -> List[dict]:\n", | |
| " \"\"\"\n", | |
| " Convert attributes to list[dict] with NaNs -> None for OGR compatibility.\n", | |
| " \"\"\"\n", | |
| " props_df = df.drop(columns=[geom_field])\n", | |
| " records = props_df.to_dict(orient=\"records\")\n", | |
| " for rec in records:\n", | |
| " for k, v in rec.items():\n", | |
| " if pd.isna(v):\n", | |
| " rec[k] = None\n", | |
| " return records\n", | |
| "\n", | |
| "\n", | |
| "# ----------------------- Spatial index helpers (robust) ---------------------- #\n", | |
| "def _rtree_exists_sqlite(path: str, layer: str, geom_name: str) -> bool:\n", | |
| " \"\"\"\n", | |
| " Check via sqlite3 whether the GeoPackage R*Tree for (layer, geom_name) exists.\n", | |
| " \"\"\"\n", | |
| " try:\n", | |
| " tbl = f\"rtree_{layer}_{geom_name}_rowid\"\n", | |
| " with sqlite3.connect(path) as con:\n", | |
| " cur = con.execute(\n", | |
| " \"SELECT name FROM sqlite_master WHERE type='table' AND name=?\",\n", | |
| " (tbl,)\n", | |
| " )\n", | |
| " return cur.fetchone() is not None\n", | |
| " except Exception:\n", | |
| " return False\n", | |
| "\n", | |
| "\n", | |
| "def _build_spatial_index_inprocess_gdal(path: str, layer: str, geom_name: str) -> bool:\n", | |
| " \"\"\"\n", | |
| " Build spatial index using GDAL Python bindings.\n", | |
| " Returns True on success, False otherwise.\n", | |
| " \"\"\"\n", | |
| " if not HAS_GDAL_PY:\n", | |
| " return False\n", | |
| " try:\n", | |
| " ds = ogr.Open(path, update=1)\n", | |
| " if ds is None:\n", | |
| " return False\n", | |
| " sql = f\"SELECT CreateSpatialIndex('{layer}','{geom_name}')\"\n", | |
| " res = ds.ExecuteSQL(sql)\n", | |
| " if res is not None and hasattr(ds, \"ReleaseResultSet\"):\n", | |
| " ds.ReleaseResultSet(res)\n", | |
| " ds = None\n", | |
| " return True\n", | |
| " except Exception as e:\n", | |
| " LOGGER.debug(f\"GDAL Python CreateSpatialIndex failed: {e}\")\n", | |
| " return False\n", | |
| "\n", | |
| "\n", | |
| "def _build_spatial_index_via_cli(path: str, layer: str, geom_name: str) -> bool:\n", | |
| " \"\"\"\n", | |
| " Build spatial index using ogrinfo CLI as fallback.\n", | |
| " Returns True on success, False otherwise.\n", | |
| " \"\"\"\n", | |
| " ogrinfo = shutil.which(\"ogrinfo\")\n", | |
| " if ogrinfo is None:\n", | |
| " return False\n", | |
| " try:\n", | |
| " sql = f\"SELECT CreateSpatialIndex('{layer}','{geom_name}')\"\n", | |
| " cmd = [ogrinfo, path, \"-sql\", sql]\n", | |
| " proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)\n", | |
| " if proc.returncode == 0:\n", | |
| " return True\n", | |
| " LOGGER.debug(f\"ogrinfo stderr: {proc.stderr}\")\n", | |
| " return False\n", | |
| " except Exception as e:\n", | |
| " LOGGER.debug(f\"ogrinfo CreateSpatialIndex failed: {e}\")\n", | |
| " return False\n", | |
| "\n", | |
| "\n", | |
| "def build_spatial_index(path: str, layer: str, geom_name: str = \"geom\") -> None:\n", | |
| " \"\"\"\n", | |
| " Ensure a GeoPackage spatial index exists by running CreateSpatialIndex.\n", | |
| " First detect if it already exists; if not, try GDAL Python, then ogrinfo CLI.\n", | |
| " \"\"\"\n", | |
| " if not BUILD_SPATIAL_INDEX_AFTER:\n", | |
| " return\n", | |
| " if not os.path.exists(path):\n", | |
| " LOGGER.warning(f\"Cannot build spatial index; file not found: {path}\")\n", | |
| " return\n", | |
| "\n", | |
| " # If already present, report and exit quietly\n", | |
| " if _rtree_exists_sqlite(path, layer, geom_name):\n", | |
| " LOGGER.info(f\"Spatial index already present for '{layer}' in {os.path.basename(path)}.\")\n", | |
| " return\n", | |
| "\n", | |
| " ok = _build_spatial_index_inprocess_gdal(path, layer, geom_name)\n", | |
| " if not ok:\n", | |
| " ok = _build_spatial_index_via_cli(path, layer, geom_name)\n", | |
| "\n", | |
| " # Re-check if it now exists\n", | |
| " if ok or _rtree_exists_sqlite(path, layer, geom_name):\n", | |
| " LOGGER.info(f\"Spatial index created for layer '{layer}' in {os.path.basename(path)}.\")\n", | |
| " else:\n", | |
| " LOGGER.warning(f\"Could not create spatial index for '{layer}' in {os.path.basename(path)} \"\n", | |
| " f\"(GDAL Python/ogrinfo unavailable?).\")\n", | |
| "\n", | |
| "\n", | |
| "# ---------------------------- Writer (with backoff) -------------------------- #\n", | |
| "def _write_with_backoff(sink, features: List[dict], initial_batch: int) -> None:\n", | |
| " \"\"\"\n", | |
| " Write features to a Fiona sink using batch 'writerecords' with adaptive backoff.\n", | |
| " If a large batch fails (TransactionError), the batch size is halved until it succeeds.\n", | |
| " Falls back to per-feature writes as a last resort.\n", | |
| "\n", | |
| " Parameters\n", | |
| " ----------\n", | |
| " sink : fiona.Collection in write mode\n", | |
| " features : list of GeoJSON-like feature dicts\n", | |
| " initial_batch : int\n", | |
| " Starting batch size (e.g., WRITE_BATCH_SIZE)\n", | |
| " \"\"\"\n", | |
| " n = len(features)\n", | |
| " if n == 0:\n", | |
| " return\n", | |
| "\n", | |
| " start = max(1, int(initial_batch))\n", | |
| " i = 0\n", | |
| " while i < n:\n", | |
| " size = min(start, n - i)\n", | |
| " # Try progressively smaller batches upon failure\n", | |
| " while True:\n", | |
| " try:\n", | |
| " batch = features[i:i+size]\n", | |
| " sink.writerecords(batch)\n", | |
| " i += size\n", | |
| " break # proceed to next segment\n", | |
| " except Exception as ex:\n", | |
| " # If batch too big for a single transaction, halve it\n", | |
| " if size <= 1:\n", | |
| " # Last resort: per-feature write; if still fails, re-raise\n", | |
| " try:\n", | |
| " sink.write(features[i])\n", | |
| " i += 1\n", | |
| " break\n", | |
| " except Exception as ex2:\n", | |
| " raise ex2\n", | |
| " size = size // 2 # back off and retry\n", | |
| "\n", | |
| "\n", | |
| "def write_csv_to_vector(\n", | |
| " input_csv: str,\n", | |
| " output_path: str,\n", | |
| " driver: str = \"GPKG\",\n", | |
| " layer: str = \"buildings\",\n", | |
| " chunk_size: int = 100_000,\n", | |
| " crs_epsg: int = 4326,\n", | |
| " overwrite: bool = False,\n", | |
| " expected_geom_col: str = \"geometry\",\n", | |
| " selected_columns: Optional[List[str]] = None,\n", | |
| ") -> Dict[str, int]:\n", | |
| " \"\"\"\n", | |
| " Convert one CSV (or .csv.gz) into a vector file (GPKG/GeoJSON) by streaming in chunks.\n", | |
| "\n", | |
| " Returns\n", | |
| " -------\n", | |
| " dict with counts: {\"rows_in\": X, \"features_out\": Y, \"skipped\": Z}\n", | |
| " \"\"\"\n", | |
| " LOGGER.info(f\"Processing: {os.path.basename(input_csv)} -> {os.path.basename(output_path)}\")\n", | |
| "\n", | |
| " # Ensure geometry column is included\n", | |
| " usecols = None\n", | |
| " if selected_columns is not None:\n", | |
| " cols = list(selected_columns)\n", | |
| " if expected_geom_col not in cols:\n", | |
| " cols.append(expected_geom_col)\n", | |
| " usecols = cols\n", | |
| "\n", | |
| " # Prime iterator for schema inference\n", | |
| " chunk_iter = iter_csv_chunks(input_csv, chunk_size=chunk_size, usecols=usecols)\n", | |
| " try:\n", | |
| " first_chunk = next(chunk_iter)\n", | |
| " except StopIteration:\n", | |
| " LOGGER.warning(f\"No data found in {input_csv}\")\n", | |
| " return {\"rows_in\": 0, \"features_out\": 0, \"skipped\": 0}\n", | |
| "\n", | |
| " if expected_geom_col not in first_chunk.columns:\n", | |
| " raise ValueError(f\"Geometry column '{expected_geom_col}' not found in {input_csv}\")\n", | |
| "\n", | |
| " # Declare schema as MultiPolygon (Polygons will be promoted)\n", | |
| " schema = build_schema_from_df(first_chunk, geometry_type=GEOM_SCHEMA_TYPE)\n", | |
| "\n", | |
| " rows_in = 0\n", | |
| " features_out = 0\n", | |
| " skipped = 0\n", | |
| "\n", | |
| " with open_fiona_sink(\n", | |
| " output_path=output_path,\n", | |
| " schema=schema,\n", | |
| " crs_epsg=crs_epsg,\n", | |
| " driver=driver,\n", | |
| " layer=layer,\n", | |
| " overwrite=True, # always clean write when we commit to processing\n", | |
| " ) as sink:\n", | |
| "\n", | |
| " def process_df(df: pd.DataFrame):\n", | |
| " \"\"\"\n", | |
| " Process a dataframe chunk using fast vectorized WKT parsing and batched writing\n", | |
| " with adaptive backoff. Repairs invalid geometries only where needed, then casts\n", | |
| " Polygon→MultiPolygon.\n", | |
| " \"\"\"\n", | |
| " nonlocal rows_in, features_out, skipped\n", | |
| " rows_in += len(df)\n", | |
| "\n", | |
| " geoms = _fast_parse_wkts(df[expected_geom_col])\n", | |
| "\n", | |
| " # Repair only invalid geometries\n", | |
| " if REPAIR_INVALID:\n", | |
| " try:\n", | |
| " invalid_mask = ~geoms.is_valid\n", | |
| " except Exception:\n", | |
| " invalid_mask = [not g.is_valid for g in geoms]\n", | |
| " if getattr(invalid_mask, \"any\", lambda: any(invalid_mask))():\n", | |
| " for i, is_bad in enumerate(invalid_mask):\n", | |
| " if is_bad:\n", | |
| " geoms[i] = repair_geometry(geoms[i])\n", | |
| "\n", | |
| " # Convert to features (skip None/empty; cast to MultiPolygon)\n", | |
| " props_list = _props_records(df, expected_geom_col)\n", | |
| " features = []\n", | |
| " for geom, props in zip(geoms, props_list):\n", | |
| " geom = _as_multipolygon(geom)\n", | |
| " if geom is None:\n", | |
| " skipped += 1\n", | |
| " continue\n", | |
| " try:\n", | |
| " features.append({\"type\": \"Feature\", \"properties\": props, \"geometry\": mapping(geom)})\n", | |
| " except Exception:\n", | |
| " skipped += 1\n", | |
| "\n", | |
| " if features:\n", | |
| " _write_with_backoff(sink, features, WRITE_BATCH_SIZE)\n", | |
| " features_out += len(features)\n", | |
| "\n", | |
| " # First + remaining chunks\n", | |
| " process_df(first_chunk)\n", | |
| " for df in chunk_iter:\n", | |
| " process_df(df)\n", | |
| "\n", | |
| " # Build spatial index right after closing the sink (GeoPackage only)\n", | |
| " if driver.upper() == \"GPKG\" and BUILD_SPATIAL_INDEX_AFTER:\n", | |
| " build_spatial_index(output_path, layer, GEOM_NAME)\n", | |
| "\n", | |
| " LOGGER.info(f\"Done: rows_in={rows_in:,}, features_out={features_out:,}, skipped={skipped:,}\")\n", | |
| " return {\"rows_in\": rows_in, \"features_out\": features_out, \"skipped\": skipped}\n", | |
| "\n", | |
| "\n", | |
| "def discover_inputs(input_dir: str, pattern: str) -> List[str]:\n", | |
| " \"\"\"Find input CSV files using a glob pattern.\"\"\"\n", | |
| " return sorted(glob(os.path.join(input_dir, pattern)))\n", | |
| "\n", | |
| "\n", | |
| "def derive_output_path(input_path: str, output_dir: str, fmt: str) -> str:\n", | |
| " \"\"\"\n", | |
| " Compute the output path for a given input (handles .csv.gz).\n", | |
| " \"\"\"\n", | |
| " base = os.path.splitext(os.path.basename(input_path))[0]\n", | |
| " if base.lower().endswith(\".csv\"):\n", | |
| " base = base[:-4]\n", | |
| " ext = \".gpkg\" if fmt.lower() == \"gpkg\" else \".geojson\"\n", | |
| " return os.path.join(output_dir, f\"{base}{ext}\")\n", | |
| "\n", | |
| "\n", | |
| "def should_process_file(\n", | |
| " input_csv: str,\n", | |
| " output_path: str,\n", | |
| " layer: str,\n", | |
| " behavior: str = \"auto\",\n", | |
| " geom_col: str = \"geometry\",\n", | |
| ") -> Tuple[bool, str]:\n", | |
| " \"\"\"\n", | |
| " Decide whether to process this file based on EXISTING_BEHAVIOR.\n", | |
| " Returns (should_process, decision_reason).\n", | |
| " \"\"\"\n", | |
| " if not os.path.exists(output_path):\n", | |
| " return True, \"no_existing_output\"\n", | |
| "\n", | |
| " if behavior == \"replace\":\n", | |
| " return True, \"replace_existing\"\n", | |
| " if behavior == \"skip\":\n", | |
| " return False, \"skip_existing\"\n", | |
| " if behavior == \"abort\":\n", | |
| " raise RuntimeError(f\"Output already exists: {output_path}. EXISTING_BEHAVIOR=abort.\")\n", | |
| "\n", | |
| " # behavior == \"auto\"\n", | |
| " try:\n", | |
| " out_count = gpkg_feature_count(output_path, layer)\n", | |
| " in_rows = csv_row_count_quick(input_csv, geom_col)\n", | |
| " if out_count is not None and in_rows is not None:\n", | |
| " if out_count >= max(0, in_rows - ALLOW_SKIPS):\n", | |
| " return False, f\"auto_skip_complete (out={out_count}, in={in_rows})\"\n", | |
| " else:\n", | |
| " return True, f\"auto_replace_incomplete (out={out_count}, in={in_rows})\"\n", | |
| " except Exception:\n", | |
| " return True, \"auto_replace_fallback\"\n", | |
| " return True, \"auto_process_default\"\n", | |
| "\n", | |
| "\n", | |
| "def run_batch(\n", | |
| " input_dir: str,\n", | |
| " output_dir: str,\n", | |
| " pattern: str = \"*.csv.gz\",\n", | |
| " output_format: str = \"gpkg\",\n", | |
| " layer: str = \"buildings\",\n", | |
| " chunk_size: int = 100_000,\n", | |
| " crs_epsg: int = 4326,\n", | |
| " geom_col: str = \"geometry\",\n", | |
| " selected_columns: Optional[List[str]] = None,\n", | |
| ") -> Dict[str, int]:\n", | |
| " \"\"\"\n", | |
| " Discover inputs and convert each to its own output vector file.\n", | |
| " Also triggers in-process spatial index creation for GeoPackage outputs.\n", | |
| " \"\"\"\n", | |
| " os.makedirs(output_dir, exist_ok=True)\n", | |
| " inputs = discover_inputs(input_dir, pattern)\n", | |
| " if not inputs:\n", | |
| " LOGGER.error(\"No input files found. Check INPUT_DIR and PATTERN.\")\n", | |
| " return {\"rows_in\": 0, \"features_out\": 0, \"skipped\": 0}\n", | |
| "\n", | |
| " driver = \"GPKG\" if output_format.lower() == \"gpkg\" else \"GeoJSON\"\n", | |
| "\n", | |
| " total_in = total_out = total_skip = 0\n", | |
| " for in_csv in inputs:\n", | |
| " out_path = derive_output_path(in_csv, output_dir, output_format)\n", | |
| "\n", | |
| " try:\n", | |
| " process, reason = should_process_file(\n", | |
| " input_csv=in_csv,\n", | |
| " output_path=out_path,\n", | |
| " layer=layer,\n", | |
| " behavior=EXISTING_BEHAVIOR,\n", | |
| " geom_col=geom_col,\n", | |
| " )\n", | |
| " except RuntimeError as e:\n", | |
| " LOGGER.error(str(e))\n", | |
| " break\n", | |
| "\n", | |
| " if not process:\n", | |
| " LOGGER.info(f\"SKIP {os.path.basename(out_path)} [{reason}]\")\n", | |
| " continue\n", | |
| " else:\n", | |
| " LOGGER.info(f\"PROCESS {os.path.basename(out_path)} [{reason}]\")\n", | |
| "\n", | |
| " try:\n", | |
| " stats = write_csv_to_vector(\n", | |
| " input_csv=in_csv,\n", | |
| " output_path=out_path,\n", | |
| " driver=driver,\n", | |
| " layer=layer,\n", | |
| " chunk_size=chunk_size,\n", | |
| " crs_epsg=crs_epsg,\n", | |
| " overwrite=True,\n", | |
| " expected_geom_col=geom_col,\n", | |
| " selected_columns=selected_columns,\n", | |
| " )\n", | |
| " total_in += stats[\"rows_in\"]\n", | |
| " total_out += stats[\"features_out\"]\n", | |
| " total_skip+= stats[\"skipped\"]\n", | |
| " except Exception as ex:\n", | |
| " LOGGER.exception(f\"Failed to process {in_csv}: {ex}\")\n", | |
| " # Continue to next file\n", | |
| "\n", | |
| " LOGGER.info(f\"ALL DONE. rows_in={total_in:,}, features_out={total_out:,}, skipped={total_skip:,}\")\n", | |
| " return {\"rows_in\": total_in, \"features_out\": total_out, \"skipped\": total_skip}\n", | |
| "\n", | |
| "\n", | |
| "# ================================ Execute =================================== #\n", | |
| "os.makedirs(OUTPUT_DIR, exist_ok=True)\n", | |
| "_ = run_batch(\n", | |
| " input_dir=INPUT_DIR,\n", | |
| " output_dir=OUTPUT_DIR,\n", | |
| " pattern=PATTERN,\n", | |
| " output_format=OUTPUT_FORMAT,\n", | |
| " layer=GPKG_LAYER,\n", | |
| " chunk_size=CHUNK_SIZE,\n", | |
| " crs_epsg=CRS_EPSG,\n", | |
| " geom_col=GEOM_COL,\n", | |
| " selected_columns=SELECTED_COLUMNS,\n", | |
| ")\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "id": "223f4379-b0d0-4517-a014-f11320a5759e", | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3 (ipykernel)", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.12.0" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 5 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment