Created
February 19, 2019 19:05
-
-
Save jwilson8767/62151d486a8c6dbf899854ddaff94468 to your computer and use it in GitHub Desktop.
Geopandas concurrent sjoin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import as_completed, ProcessPoolExecutor | |
import geopandas as gpd | |
import pandas as pd | |
import numpy as np | |
from collections.abc import Sequence | |
from shapely import prepared | |
def sjoin(left_df, right_df, op='intersects', how='inner', lsuffix='left', rsuffix='right', fail_crs_mismatch: bool = True, fail_missing_geometries: bool = False) -> gpd.GeoDataFrame: | |
"""Spatial join of two GeoDataFrames. GeoPandas sjoin with concurrency (split naively using df slicing). | |
Parameters | |
---------- | |
:param left_df: GeoDataFrame | |
:param right_df: GeoDataFrame | |
:param op: string, default 'intersection' | |
Binary predicate, one of {'intersects', 'contains', 'within'}. | |
See http://toblerity.org/shapely/manual.html#binary-predicates. | |
:param how: string, default 'inner' | |
The type of join: | |
* 'left': use keys from left_df; retain only left_df geometry column | |
* 'right': use keys from right_df; retain only right_df geometry column | |
* 'inner': use intersection of keys from both dfs; retain geometry column according to retain_geometry. | |
:param lsuffix: string, default 'left' | |
Suffix to apply to overlapping column names (left GeoDataFrame). | |
:param rsuffix: string, default 'right' | |
Suffix to apply to overlapping column names (right GeoDataFrame). | |
:param inner_index: string, default 'left' | |
Which index to retain for inner joins, one of {'left', 'right', 'both'}. | |
:param hide_progress: bool suppresses progress bars | |
:param fail_crs_mismatch: Whether to throw an exception when crs does not match exactly. | |
:param fail_missing_geometries: Whether to throw an exception when there are missing geometries. | |
""" | |
_validate('how', how) | |
_validate('op', op) | |
if left_df.empty: | |
raise ValueError("`left_df` is empty. Cannot perform spatial join.") | |
if right_df.empty: | |
raise ValueError("`right_df` is empty. Cannot perform spatial join.") | |
if fail_missing_geometries: | |
if pd.isnull(left_df.geometry).any(): | |
raise ValueError('`left_df` has missing geometries.') | |
if pd.isnull(right_df.geometry).any(): | |
raise ValueError('`right_df` has missing geometries.') | |
if left_df.crs != right_df.crs: | |
if fail_crs_mismatch: | |
raise AttributeError('CRS does not match! left_df: {}, right_df: {}'.format(left_df.crs, right_df.crs)) | |
else: | |
print('Warning: CRS does not match!') | |
index_left = 'index_%s' % lsuffix | |
index_right = 'index_%s' % rsuffix | |
# due to GH 352 | |
if (any(left_df.columns.isin([index_left, index_right])) | |
or any(right_df.columns.isin([index_left, index_right]))): | |
raise ValueError("'{0}' and '{1}' cannot be names in the frames being" | |
" joined".format(index_left, index_right)) | |
# Attempt to re-use spatial indexes, otherwise generate the spatial index for the longer dataframe | |
if right_df._sindex_generated or (not left_df._sindex_generated and right_df.shape[0] > left_df.shape[0]): | |
tree_idx = right_df.sindex | |
tree_idx_right = True | |
else: | |
tree_idx = left_df.sindex | |
tree_idx_right = False | |
# the rtree spatial index only allows limited (numeric) index types, but an | |
# index in geopandas may be any arbitrary dtype. so reset both indices now | |
# and store references to the original indices, to be reaffixed later. | |
# GH 352 | |
left_df = left_df.copy(deep=True) | |
left_df.index = left_df.index.rename(index_left) | |
left_df = left_df.reset_index() | |
right_df = right_df.copy(deep=True) | |
right_df.index = right_df.index.rename(index_right) | |
right_df = right_df.reset_index() | |
r_idx = np.empty([0, 0]) | |
l_idx = np.empty([0, 0]) | |
# get rtree spatial index | |
if tree_idx_right: | |
idxmatch = (left_df.geometry.apply(lambda a: a.bounds) | |
.apply(lambda a: list(tree_idx.intersection(a)))) | |
idxmatch = idxmatch[idxmatch.apply(len) > 0] | |
# indexes of overlapping boundaries | |
if idxmatch.shape[0] > 0: | |
r_idx = np.concatenate(idxmatch.values) | |
l_idx = np.concatenate([[i] * len(v) for i, v in idxmatch.iteritems()]) | |
else: # tree_idx_df == 'left': | |
idxmatch = (right_df.geometry.apply(lambda a: a.bounds) | |
.apply(lambda x: list(tree_idx.intersection(x)))) | |
idxmatch = idxmatch[idxmatch.apply(len) > 0] | |
if idxmatch.shape[0] > 0: | |
# indexes of overlapping boundaries | |
l_idx = np.concatenate(idxmatch.values) | |
r_idx = np.concatenate([[i] * len(v) for i, v in idxmatch.iteritems()]) | |
if len(r_idx) > 0 and len(l_idx) > 0: | |
matches = ( | |
pd.DataFrame( | |
np.column_stack( | |
[ | |
left_df[left_df.geometry.name][l_idx], | |
right_df[right_df.geometry.name][r_idx] | |
])) | |
) | |
sjoined = concurrent_map(fn=Op(op), iterable=matches) | |
result = ( | |
pd.DataFrame( | |
np.column_stack( | |
[l_idx, | |
r_idx, | |
sjoined | |
]) | |
) | |
) | |
result.columns = ['_key_left', '_key_right', 'match_bool'] | |
result = pd.DataFrame(result[result['match_bool'] == 1]).drop('match_bool', axis=1) | |
else: | |
# when output from the join has no overlapping geometries | |
result = pd.DataFrame(columns=['_key_left', '_key_right'], dtype=float) | |
if how == 'inner': | |
result = result.set_index('_key_left') | |
joined = ( | |
gpd.GeoDataFrame( | |
left_df | |
.merge(result, left_index=True, right_index=True) | |
.merge(right_df, | |
left_on='_key_right', right_index=True, | |
suffixes=('_%s' % lsuffix, '_%s' % rsuffix)), | |
crs=left_df.crs) | |
) | |
joined = joined.rename(columns={'geometry_%s' % lsuffix: 'geometry'}) | |
del joined['geometry_%s' % rsuffix] | |
joined = joined.set_index(index_left).drop(['_key_right'], axis=1) # type: gpd.GeoDataFrame | |
joined.index.name = None | |
elif how == 'left': | |
result = result.set_index('_key_left') | |
joined = ( | |
left_df | |
.merge(result, left_index=True, right_index=True, how='left') | |
.merge(right_df.drop(right_df.geometry.name, axis=1), | |
how='left', left_on='_key_right', right_index=True, | |
suffixes=('_%s' % lsuffix, '_%s' % rsuffix)) | |
) | |
joined = joined.set_index(index_left).drop(['_key_right'], axis=1) | |
joined.index.name = None | |
else: # how == 'right': | |
joined = ( | |
left_df | |
.drop(left_df.geometry.name, axis=1) | |
.merge(result.merge(right_df, | |
left_on='_key_right', right_index=True, | |
how='right'), left_index=True, | |
right_on='_key_left', how='right') | |
.set_index(index_right) | |
) | |
joined = joined.drop(['_key_left', '_key_right'], axis=1) | |
return joined | |
def concurrent_map(fn, iterable, chunk_size=256, **kwargs): | |
""" | |
Concurrently maps a function on a slice-able iterable with keyword arguments passed along. | |
:param fn: the function to be called on the iterables. | |
:param iterable: the list, NumPy array, DataFrame to call the function on. | |
:param chunk_size: splits the iterable into smaller chunks of this size or less. | |
:param kwargs: keyword arguments passed to the functions. | |
:return: the concatenated results of calling the function on the iterable. | |
""" | |
number_splits = (len(iterable) + chunk_size - 1) // chunk_size | |
splits = [x * chunk_size for x in range(number_splits)] | |
chunks = [iterable[skip:skip + chunk_size] for skip in splits] | |
chunks = [chunk.copy() for chunk in chunks] | |
with ProcessPoolExecutor() as executor: | |
try: | |
futures = [executor.submit(fn, args, **kwargs) for args in chunks] | |
for f in as_completed(futures): | |
# bubble up exceptions from workers | |
if f.exception() is not None: | |
raise f.exception() | |
continue | |
except Exception as ex: | |
executor.shutdown(wait=True) | |
raise ex | |
results = [future.result() for future in futures] | |
if len(results) > 0: | |
if isinstance(results[0], pd.core.generic.NDFrame): | |
return pd.concat(results) | |
elif isinstance(results[0], np.ndarray): | |
return np.concatenate(results) | |
merged = [] | |
for chunk in results: | |
if isinstance(chunk, Sequence): | |
merged += chunk | |
return merged | |
sjoin_parameters = { | |
'how': ['left', 'right', 'inner'], | |
'op': ['intersects', 'within', 'contains'] | |
} | |
def _validate(name, value): | |
if value not in sjoin_parameters[name]: | |
raise ValueError('{} was \'{}\' but is expected to be one of {}.'.format(name, value, sjoin_parameters[name])) | |
class Op: | |
def __init__(self, func): | |
self.func = func | |
def __call__(self, df): | |
return np.vectorize(globals()['_'+self.func])(df[0].apply(prepared.prep), df[1]) | |
def _intersects(a1, a2): | |
return a1.intersects(a2) | |
def _contains(a1, a2): | |
return a1.contains(a2) | |
def _within(a1, a2): | |
return a1.within(a2) | |
if __name__ == '__main__': | |
a = gpd.read_file('a.shp') | |
b = gpd.read_file('b.shp') | |
print(a.shape) | |
print(b.shape) | |
c = sjoin(a, b, how='left') | |
print(c.shape) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
So let's check a few things:
op='intersects'
do you get a result?