Last active
June 8, 2023 16:01
-
-
Save cholmes/d9ad24efe53fa5e094055ffb3e41979c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy | |
import geopandas as gpd | |
import dask_geopandas as dgd | |
import pandas as pd | |
import dask.dataframe | |
from shapely import wkb | |
import time | |
TOTAL_ROWS = 150000000 | |
CHUNKSIZE = 2500000 | |
params = { | |
'database': 'buildings', | |
'user': 'cholmes', | |
'password': 'your_password', # Replace with your actual password | |
'host': 'localhost', | |
'port': '5432', # Default Postgres port, adjust if needed | |
} | |
def load_chunk(engine, offset): | |
# Execute the query to fetch the chunk | |
chunk = pd.read_sql_query( | |
f""" | |
SELECT id, ST_AsEWKB(geometry) as geometry | |
FROM buildings_unique | |
LIMIT {CHUNKSIZE} OFFSET {offset}; | |
""", | |
engine | |
) | |
# Convert the WKB geometry to a GeoSeries | |
geoms = chunk['geometry'].apply(bytes).apply(wkb.loads) | |
chunk['geometry'] = gpd.GeoSeries(geoms) | |
return dgd.from_geopandas(chunk, npartitions=1) | |
def main(): | |
# Create the database connection engine | |
engine = sqlalchemy.create_engine(f"postgresql://{params['user']}:{params['password']}@{params['host']}:{params['port']}/{params['database']}") | |
ddf = dgd.from_geopandas(gpd.GeoDataFrame(), npartitions=int(TOTAL_ROWS / CHUNKSIZE)) | |
print("Starting processing at " + str(pd.Timestamp.now())) | |
for offset in range(0, TOTAL_ROWS, CHUNKSIZE): | |
chunk = pd.read_sql_query( | |
f""" | |
SELECT id, ST_AsEWKB(geometry) as geometry | |
FROM buildings_unique | |
LIMIT {CHUNKSIZE} OFFSET {offset}; | |
""", | |
engine | |
) | |
# Convert the WKB geometry to a GeoSeries | |
geoms = chunk['geometry'].apply(bytes).apply(wkb.loads) | |
chunk['geometry'] = gpd.GeoSeries(geoms) | |
print(f"Time: {time.ctime()}, Rows Processed: {min(offset + CHUNKSIZE, TOTAL_ROWS)}") | |
chunk_dgd = dgd.from_geopandas(chunk, npartitions=1) | |
ddf = dask.dataframe.concat([ddf, chunk_dgd]) | |
geometry = ddf.geometry.map_partitions( | |
gpd.GeoSeries, meta=gpd.GeoSeries([]) | |
) | |
gdf = dgd.from_dask_dataframe(ddf, geometry=geometry).set_crs("EPSG:4326") | |
# Once the chunks have all been loaded and appended, write the result to parquet | |
gdf.to_parquet('/Users/cholmes/Downloads/geo-data/google-buildings.parquet') | |
print("Finished processing at " + str(pd.Timestamp.now())) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment