Skip to content

Instantly share code, notes, and snippets.

@deton
Last active October 5, 2025 12:34
Show Gist options
  • Save deton/92c78bad902108e1cb3766095778c810 to your computer and use it in GitHub Desktop.
Save deton/92c78bad902108e1cb3766095778c810 to your computer and use it in GitHub Desktop.
Polygonから指定距離以内にあるPointのみを抽出。(駅の近くにある駐車場を抽出)
# Polygonから指定距離以内にあるPointのみを抽出
# cf. https://stackoverflow.com/a/74291617
import argparse
import geopandas as gpd
import pandas as pd
parser = argparse.ArgumentParser()
parser.add_argument("polygon_file", help="polygon(station) list file (GeoJSON or shp)")
parser.add_argument("point_file", help="point(parking) list file (CSV or GeoJSON). The file must have count_column.")
parser.add_argument("-c", "--count_column", help="column name for max_count in point_file", default="parking_lot")
parser.add_argument("-x", "--max_count", help="max count threshold to limit number of points", type=int, default=100)
parser.add_argument("-d", "--distance", help="distance[m] from polygon", type=float, default=500)
parser.add_argument("-o", "--output_file", help="output filename", default="point-near-polygon.geojson")
args = parser.parse_args()
EPSG = "EPSG:6668"
gdfst = gpd.read_file(args.polygon_file, encoding="utf-8")
utmcrs = gdfst.estimate_utm_crs()
gdfstutm = gdfst.to_crs(utmcrs)
if args.point_file.endswith(".csv"):
df = pd.read_csv(args.point_file, encoding="utf-8")
gdfpt = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"])).set_crs(EPSG)
else:
gdfpt = gpd.read_file(args.point_file, encoding="utf-8")
gdfptutm = gdfpt.to_crs(utmcrs)
def filter_upto_max(df_group):
"""各Polygonに近いPointのcountのcumsumがmax_countに達したら、もうPointは追加しない"""
cumsum = 0
filter_flags = []
pt_list = [] # point(parking)
st_list = [] # polygon(station)
for pt, st, count in zip(df_group["index"], df_group["index_right"], df_group[args.count_column]):
pt_list.append(pt)
st_list.append(st)
if cumsum > args.max_count:
filter_flags.append(False)
else:
filter_flags.append(True)
cumsum += count
return pd.DataFrame({"index": pt_list, "index_right": st_list, "keep": filter_flags})
sj = gpd.sjoin_nearest(gdfptutm.reset_index(), gdfstutm, max_distance=args.distance).reset_index(drop=True)
sj = sj.sort_values(["index_right", args.count_column], ascending=False) # countの多いPointを優先して抽出
sjg = sj.groupby("index_right", as_index=False)
sjkeep = sjg[["index", "index_right", args.count_column]].apply(filter_upto_max)
sj = pd.merge(sj, sjkeep, on=["index", "index_right"])
sj = sj[sj["keep"]==True].drop("keep", axis=1)
# XXX: 各Polygonに対しcountが最大のPointのみに
#sj = sj.loc[sjg[args.count_column].idxmax(),:].set_crs(utmcrs).to_crs(EPSG)
sj = sj.groupby("index").first().set_crs(utmcrs).to_crs(EPSG) # drop duplicated Point
sj = sj.sort_values("index_right").drop("index_right", axis=1)
if args.output_file.endswith(".csv"):
sj.drop("geometry", axis=1).to_csv(args.output_file, index=False)
else:
sj.to_file(args.output_file, driver="GeoJSON")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment