Last active
November 27, 2023 22:51
-
-
Save derrickturk/c3b2a1a79d814f33a78c48334c429a82 to your computer and use it in GitHub Desktop.
pyodbc converter for SQL Server POINT, MULTIPOINT, and LINESTRING (incl XYZM) geometries and geographies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2023 dwt | terminus, LLC | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import struct | |
import numpy as np | |
import numpy.typing as npt | |
import numpy.lib.stride_tricks as nplst | |
from typing import NamedTuple, TypeAlias | |
FLOAT_LE = np.dtype(float).newbyteorder('<') | |
FLAG_Z = 0x01 | |
FLAG_M = 0x02 | |
FLAG_V = 0x04 | |
FLAG_P = 0x08 | |
FLAG_L = 0x10 | |
class PointXY(NamedTuple): | |
x: float | |
y: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.x} {self.y})' | |
def isowkb(self) -> bytes: | |
return struct.pack('<BIdd', 1, 1, self.x, self.y) | |
class PointXYZ(NamedTuple): | |
x: float | |
y: float | |
z: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.x} {self.y} {self.z})' | |
def isowkb(self) -> bytes: | |
return struct.pack('<BIddd', 1, 1001, self.x, self.y, self.z) | |
class PointXYZM(NamedTuple): | |
x: float | |
y: float | |
z: float | |
m: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.x} {self.y} {self.z} {self.m})' | |
def isowkb(self) -> bytes: | |
return struct.pack('<BIdddd', 1, 3001, self.x, self.y, self.z, self.m) | |
class PointXYM(NamedTuple): | |
x: float | |
y: float | |
m: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.x} {self.y} NULL {self.m})' | |
def isowkb(self) -> bytes: | |
return struct.pack('<BIddd', 1, 2001, self.x, self.y, self.m) | |
class PointSRIDXY(NamedTuple): | |
srid: int | |
xlat: float | |
ylon: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.xlat} {self.ylon})' | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return struct.pack('<BIdd', 1, 1, self.ylon, self.xlat) | |
else: | |
return struct.pack('<BIdd', 1, 1, self.xlat, self.ylon) | |
class PointSRIDXYZ(NamedTuple): | |
srid: int | |
xlat: float | |
ylon: float | |
z: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.xlat} {self.ylon} {self.z})' | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return struct.pack('<BIddd', 1, 1001, self.ylon, self.xlat, self.z) | |
else: | |
return struct.pack('<BIddd', 1, 1001, self.xlat, self.ylon, self.z) | |
class PointSRIDXYZM(NamedTuple): | |
srid: int | |
xlat: float | |
ylon: float | |
z: float | |
m: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.xlat} {self.ylon} {self.z} {self.m})' | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return struct.pack('<BIdddd', 1, 3001, self.ylon, self.xlat, | |
self.z, self.m) | |
else: | |
return struct.pack('<BIdddd', 1, 3001, self.xlat, self.ylon, | |
self.z, self.m) | |
class PointSRIDXYM(NamedTuple): | |
srid: int | |
xlat: float | |
ylon: float | |
m: float | |
@property | |
def wkt(self) -> str: | |
return f'POINT({self.xlat} {self.ylon} NULL {self.m})' | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return struct.pack('<BIddd', 1, 2001, self.ylon, self.xlat, self.m) | |
else: | |
return struct.pack('<BIddd', 1, 2001, self.xlat, self.ylon, self.m) | |
class LineStringXY(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'LINESTRING EMPTY' | |
contents = ', '.join(f'{x} {y}' for x, y in zip(self.x, self.y)) | |
return f'LINESTRING ({contents})' | |
def isowkb(self) -> bytes: | |
hdr = struct.pack('<BII', 1, 2, self.x.size) | |
pts = np.empty((self.x.size + self.y.size,), dtype=FLOAT_LE) | |
pts[0::2] = self.x | |
pts[1::2] = self.y | |
return hdr + pts.tobytes() | |
class LineStringXYZ(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'LINESTRING EMPTY' | |
contents = ', '.join(f'{x} {y} {z if not np.isnan(z) else "NULL"}' | |
for x, y, z in zip(self.x, self.y, self.z)) | |
return f'LINESTRING ({contents})' | |
def isowkb(self) -> bytes: | |
hdr = struct.pack('<BII', 1, 1002, self.x.size) | |
pts = np.empty((self.x.size + self.y.size + self.z.size,), | |
dtype=FLOAT_LE) | |
pts[0::3] = self.x | |
pts[1::3] = self.y | |
pts[2::3] = self.z | |
return hdr + pts.tobytes() | |
class LineStringXYZM(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'LINESTRING EMPTY' | |
contents = ', '.join( | |
f'{x} {y} {z if not np.isnan(z) else "NULL"} {m if not np.isnan(m) else "NULL"}' | |
for x, y, z, m in zip(self.x, self.y, self.z, self.m)) | |
return f'LINESTRING ({contents})' | |
def isowkb(self) -> bytes: | |
hdr = struct.pack('<BII', 1, 3002, self.x.size) | |
pts = np.empty((self.x.size + self.y.size + self.z.size + self.m.size,), | |
dtype=FLOAT_LE) | |
pts[0::4] = self.x | |
pts[1::4] = self.y | |
pts[2::4] = self.z | |
pts[3::4] = self.m | |
return hdr + pts.tobytes() | |
class LineStringXYM(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'LINESTRING EMPTY' | |
contents = ', '.join( | |
f'{x} {y} NULL {m if not np.isnan(m) else "NULL"}' | |
for x, y, m in zip(self.x, self.y, self.m)) | |
return f'LINESTRING ({contents})' | |
def isowkb(self) -> bytes: | |
hdr = struct.pack('<BII', 1, 2002, self.x.size) | |
pts = np.empty((self.x.size + self.y.size + self.m.size,), | |
dtype=FLOAT_LE) | |
pts[0::3] = self.x | |
pts[1::3] = self.y | |
pts[2::3] = self.m | |
return hdr + pts.tobytes() | |
class LineStringSRIDXY(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return LineStringXY(self.xlat, self.ylon).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return LineStringXY(self.ylon, self.xlat).isowkb() | |
else: | |
return LineStringXY(self.xlat, self.ylon).isowkb() | |
class LineStringSRIDXYZ(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return LineStringXYZ(self.xlat, self.ylon, self.z).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return LineStringXYZ(self.ylon, self.xlat, self.z).isowkb() | |
else: | |
return LineStringXYZ(self.xlat, self.ylon, self.z).isowkb() | |
class LineStringSRIDXYZM(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return LineStringXYZM(self.xlat, self.ylon, self.z, self.m).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return LineStringXYZM(self.ylon, self.xlat, self.z, self.m).isowkb() | |
else: | |
return LineStringXYZM(self.xlat, self.ylon, self.z, self.m).isowkb() | |
class LineStringSRIDXYM(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return LineStringXYM(self.xlat, self.ylon, self.m).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return LineStringXYM(self.ylon, self.xlat, self.m).isowkb() | |
else: | |
return LineStringXYM(self.xlat, self.ylon, self.m).isowkb() | |
class MultiPointXY(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'MULTIPOINT EMPTY' | |
contents = ', '.join(f'({x} {y})' for x, y in zip(self.x, self.y)) | |
return f'MULTIPOINT ({contents})' | |
def isowkb(self) -> bytes: | |
buf = bytearray(struct.pack('<BII', 1, 4, self.x.size)) | |
for x, y in zip(self.x, self.y): | |
buf.extend(PointXY(x, y).isowkb()) | |
return bytes(buf) | |
class MultiPointXYZ(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'MULTIPOINT EMPTY' | |
contents = ', '.join(f'({x} {y} {z if not np.isnan(z) else "NULL"})' | |
for x, y, z in zip(self.x, self.y, self.z)) | |
return f'MULTIPOINT ({contents})' | |
def isowkb(self) -> bytes: | |
buf = bytearray(struct.pack('<BII', 1, 1004, self.x.size)) | |
for x, y, z in zip(self.x, self.y, self.z): | |
buf.extend(PointXYZ(x, y, z).isowkb()) | |
return bytes(buf) | |
class MultiPointXYZM(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'MULTIPOINT EMPTY' | |
contents = ', '.join( | |
f'({x} {y} {z if not np.isnan(z) else "NULL"} {m if not np.isnan(m) else "NULL"})' | |
for x, y, z, m in zip(self.x, self.y, self.z, self.m)) | |
return f'MULTIPOINT ({contents})' | |
def isowkb(self) -> bytes: | |
buf = bytearray(struct.pack('<BII', 1, 3004, self.x.size)) | |
for x, y, z, m in zip(self.x, self.y, self.z, self.m): | |
buf.extend(PointXYZM(x, y, z, m).isowkb()) | |
return bytes(buf) | |
class MultiPointXYM(NamedTuple): | |
x: npt.NDArray[np.float64] | |
y: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
if self.x.size == 0: | |
return 'MULTIPOINT EMPTY' | |
contents = ', '.join( | |
f'({x} {y} NULL {m if not np.isnan(m) else "NULL"})' | |
for x, y, m in zip(self.x, self.y, self.m)) | |
return f'MULTIPOINT ({contents})' | |
def isowkb(self) -> bytes: | |
buf = bytearray(struct.pack('<BII', 1, 2004, self.x.size)) | |
for x, y, m in zip(self.x, self.y, self.m): | |
buf.extend(PointXYZM(x, y, np.nan, m).isowkb()) | |
return bytes(buf) | |
class MultiPointSRIDXY(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return MultiPointXY(self.xlat, self.ylon).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return MultiPointXY(self.ylon, self.xlat).isowkb() | |
else: | |
return MultiPointXY(self.xlat, self.ylon).isowkb() | |
class MultiPointSRIDXYZ(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return MultiPointXYZ(self.xlat, self.ylon, self.z).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return MultiPointXYZ(self.ylon, self.xlat, self.z).isowkb() | |
else: | |
return MultiPointXYZ(self.xlat, self.ylon, self.z).isowkb() | |
class MultiPointSRIDXYZM(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
z: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return MultiPointXYZM(self.xlat, self.ylon, self.z, self.m).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return MultiPointXYZM(self.ylon, self.xlat, self.z, self.m).isowkb() | |
else: | |
return MultiPointXYZM(self.xlat, self.ylon, self.z, self.m).isowkb() | |
class MultiPointSRIDXYM(NamedTuple): | |
srid: int | |
xlat: npt.NDArray[np.float64] | |
ylon: npt.NDArray[np.float64] | |
m: npt.NDArray[np.float64] | |
@property | |
def wkt(self) -> str: | |
return MultiPointXYM(self.xlat, self.ylon, self.m).wkt | |
def isowkb(self, latlon: bool = False) -> bytes: | |
if latlon: | |
return MultiPointXYM(self.ylon, self.xlat, self.m).isowkb() | |
else: | |
return MultiPointXYM(self.xlat, self.ylon, self.m).isowkb() | |
Geometry: TypeAlias = ( | |
PointXY | PointXYZ | PointXYM | PointXYZM | |
| LineStringXY | LineStringXYZ | LineStringXYZM | LineStringXYM | |
| MultiPointXY | MultiPointXYZ | MultiPointXYZM | MultiPointXYM | |
) | |
Geography: TypeAlias = ( | |
PointSRIDXY | PointSRIDXYZ | PointSRIDXYM | PointSRIDXYZM | |
| LineStringSRIDXY | LineStringSRIDXYZ | LineStringSRIDXYZM | |
| LineStringSRIDXYM | |
| MultiPointSRIDXY | MultiPointSRIDXYZ | MultiPointSRIDXYZM | |
| MultiPointSRIDXYM | |
) | |
def convert_151(binary: bytes) -> Geography | Geometry: | |
srid, ver, props = struct.unpack_from('<lBB', binary) | |
if ver not in (1, 2): | |
raise ValueError('probably not a geography or geometry') | |
if props & FLAG_P: | |
if props & FLAG_Z: | |
if props & FLAG_M: | |
x, y, z, m = struct.unpack_from( | |
'<4d', binary, offset=0x06) | |
if srid: | |
return PointSRIDXYZM(srid, x, y, z, m) | |
else: | |
return PointXYZM(x, y, z, m) | |
else: | |
x, y, z = struct.unpack_from('<3d', binary, offset=0x06) | |
if srid: | |
return PointSRIDXYZ(srid, x, y, z) | |
else: | |
return PointXYZ(x, y, z) | |
elif props & FLAG_M: | |
x, y, m = struct.unpack_from('<3d', binary, offset=0x06) | |
if srid: | |
return PointSRIDXYM(srid, x, y, m) | |
else: | |
return PointXYM(x, y, m) | |
else: | |
x, y = struct.unpack_from('<2d', binary, offset=0x06) | |
if srid: | |
return PointSRIDXY(srid, x, y) | |
else: | |
return PointXY(x, y) | |
elif props & FLAG_L: | |
n_pts = 2 | |
offset = 0x06 | |
else: | |
n_pts, = struct.unpack_from('<H', binary, offset=0x06) | |
offset = 0x0a | |
xy = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts * 2) | |
xs, ys = nplst.as_strided(xy, (2, n_pts), (8, 16)) | |
offset += n_pts * 16 | |
zs: npt.NDArray[np.float64] | None = None | |
if props & FLAG_Z: | |
zs = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts) | |
offset += n_pts * 8 | |
ms: npt.NDArray[np.float64] | None = None | |
if props & FLAG_M: | |
ms = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts) | |
offset += n_pts * 8 | |
is_multipoint = False | |
if not (props & FLAG_L): | |
n_fig, = struct.unpack_from('<L', binary, offset=offset) | |
offset += 4 | |
for i in range(n_fig): | |
fig_attr, fig_off = struct.unpack_from('<Bl', binary, offset=offset) | |
offset += 5 | |
if (ver == 1 and fig_attr != 1) or (ver == 2 and fig_attr not in (0, 1)): | |
raise ValueError('we only handle linestrings and multipoints') | |
if i != fig_off: | |
raise ValueError('we only handle "in-order" things') | |
n_shapes, = struct.unpack_from('<L', binary, offset=offset) | |
offset += 4 | |
for i in range(n_shapes): | |
parent_off, fig_off, shape_type = struct.unpack_from( | |
'<llB', binary, offset=offset) | |
offset += 9 | |
if i == 0: | |
if parent_off != -1 or fig_off not in (0, -1): | |
raise ValueError('we only handle "in-order" things') | |
if shape_type == 2: | |
if n_shapes != 1: | |
raise ValueError('we only handle single linestrings') | |
elif shape_type == 4: | |
is_multipoint = True | |
else: | |
raise ValueError( | |
'we only handle linestrings and multipoints') | |
elif is_multipoint and (fig_off not in (i - 1, -1) or shape_type != 1): | |
raise ValueError('we only handle "in-order" multipoints') | |
if offset != len(binary): | |
raise ValueError('probably not a geography or geometry') | |
# types ignored because mypy's too stupid to realize the match means certain | |
# values must be non-None | |
if not is_multipoint: | |
match srid, zs, ms: | |
case (0, None, None): return LineStringXY(xs, ys) | |
case (_, None, None): return LineStringSRIDXY(srid, xs, ys) | |
case (0, _, None): return LineStringXYZ(xs, ys, zs) # type: ignore | |
case (_, _, None): return LineStringSRIDXYZ(srid, xs, ys, zs) # type: ignore | |
case (0, None, _): return LineStringXYM(xs, ys, ms) # type: ignore | |
case (_, None, _): return LineStringSRIDXYM(srid, xs, ys, ms) # type: ignore | |
case (0, _, _): return LineStringXYZM(xs, ys, zs, ms) # type: ignore | |
case (_, _, _): return LineStringSRIDXYZM(srid, xs, ys, zs, ms) # type: ignore | |
case _: raise RuntimeError('unreachable') | |
else: | |
match srid, zs, ms: | |
case (0, None, None): return MultiPointXY(xs, ys) | |
case (_, None, None): return MultiPointSRIDXY(srid, xs, ys) | |
case (0, _, None): return MultiPointXYZ(xs, ys, zs) # type: ignore | |
case (_, _, None): return MultiPointSRIDXYZ(srid, xs, ys, zs) # type: ignore | |
case (0, None, _): return MultiPointXYM(xs, ys, ms) # type: ignore | |
case (_, None, _): return MultiPointSRIDXYM(srid, xs, ys, ms) # type: ignore | |
case (0, _, _): return MultiPointXYZM(xs, ys, zs, ms) # type: ignore | |
case (_, _, _): return MultiPointSRIDXYZM(srid, xs, ys, zs, ms) # type: ignore | |
case _: raise RuntimeError('unreachable') | |
__all__ = [ | |
'PointXY', | |
'PointXYZ', | |
'PointXYZM', | |
'PointXYM', | |
'PointSRIDXY', | |
'PointSRIDXYZ', | |
'PointSRIDXYZM', | |
'PointSRIDXYM', | |
'LineStringXY', | |
'LineStringXYZ', | |
'LineStringXYZM', | |
'LineStringXYM', | |
'LineStringSRIDXY', | |
'LineStringSRIDXYZ', | |
'LineStringSRIDXYZM', | |
'LineStringSRIDXYM', | |
'MultiPointXY', | |
'MultiPointXYZ', | |
'MultiPointXYZM', | |
'MultiPointXYM', | |
'MultiPointSRIDXY', | |
'MultiPointSRIDXYZ', | |
'MultiPointSRIDXYZM', | |
'MultiPointSRIDXYM', | |
'Geometry', | |
'Geography', | |
'convert_151', | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment