Last active
September 26, 2022 16:15
-
-
Save ljwolf/ed42cab09a88b40b0c509e53b013ab9c to your computer and use it in GitHub Desktop.
Towards a faster kernel weights class in PySAL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sklearn import metrics | |
from sklearn.neighbors import BallTree | |
import numpy, pandas | |
from scipy.optimize import minimize_scalar | |
from scipy import sparse, stats | |
from libpysal.weights import WSP, Kernel | |
from libpysal.weights.util import get_points_array | |
coordinates = numpy.random.normal(0, 1, size=(100, 2)) | |
def _triangular(distances, bandwidth): | |
u = numpy.clip(distances / bandwidth, 0, 1) | |
return 1 - u | |
def _parabolic(distances, bandwidth): | |
u = numpy.clip(distances / bandwidth, 0, 1) | |
return 0.75 * (1 - u**2) | |
def _gaussian(distances, bandwidth): | |
u = distances / bandwidth | |
return numpy.exp(-((u / 2) ** 2)) / (numpy.sqrt(2) * numpy.pi) | |
def _bisquare(distances, bandwidth): | |
u = numpy.clip(distances / bandwidth, 0, 1) | |
return (15 / 16) * (1 - u**2) ** 2 | |
def _cosine(distances, bandwidth): | |
u = numpy.clip(distances / bandwidth, 0, 1) | |
return (numpy.pi / 4) * numpy.cos(numpy.pi / 2 * u) | |
_kernel_functions = dict( | |
triangular=_triangular, | |
parabolic=_parabolic, | |
gaussian=_gaussian, | |
bisquare=_bisquare, | |
cosine=_cosine, | |
) | |
def FastKernel( | |
coordinates, | |
bandwidth=None, | |
metric="euclidean", | |
kernel="triangular", | |
k=None, | |
indices=None, | |
**kwargs | |
): | |
if hasattr(coordinates, "geometry"): | |
indices = coordinates.index | |
coordinates = get_points_array(coordinates) | |
else: | |
if indices is None: | |
indices = pandas.RangeIndex(coordinates.shape[0]) | |
if k is not None: | |
tree = BallTree(coordinates) | |
D_linear, ixs = tree.query(coordinates, k=k + 1) | |
self_ix, neighbor_ix = ixs[:, 0], ixs[:, 1:] | |
D_linear = D_linear[:, 1:] | |
self_ix_flat = numpy.repeat(self_ix, k) | |
neighbor_ix_flat = neighbor_ix.flatten() | |
D_linear_flat = D_linear.flatten() | |
D = sparse.csc_matrix((D_linear_flat, (self_ix_flat, neighbor_ix_flat))) | |
else: | |
D = metrics.pairwise_distances(coordinates, metric=metric) | |
D = sparse.csc_matrix(D) | |
if bandwidth is None: | |
bandwidth = numpy.percentile(D.data, 25) | |
elif bandwidth == "opt": | |
bandwidth = _optimize_bandwidth(D, kernel) | |
K = D.copy() | |
K.data = _kernel_functions[kernel](D.data, bandwidth) | |
W = WSP(K, index=indices).to_W() | |
W._cache["bandwidth"] = bandwidth | |
return W | |
def _from_dataframe(df, **kwargs): | |
coordinates = get_points_array(df.geometry) | |
indices = df.index | |
return FastKernel(coordinates, indices=indices, **kwargs) | |
FastKernel.from_dataframe = _from_dataframe | |
def _optimize_bandwidth(D, kernel): | |
kernel_function = _kernel_functions[kernel] | |
def _loss(bandwidth, D=D, kernel_function=kernel_function): | |
Ku = kernel_function(D.data, bandwidth) | |
bins, _ = numpy.histogram(Ku, bins=int(D.shape[0] ** 0.5), range=(0,1)) | |
return -stats.entropy(bins / bins.sum()) | |
xopt = minimize_scalar(_loss, bounds=(0, D.data.max() * 2), method="bounded") | |
return xopt.x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment