Skip to content

Instantly share code, notes, and snippets.

@derrickturk
Last active November 27, 2023 22:51
Show Gist options
  • Save derrickturk/c3b2a1a79d814f33a78c48334c429a82 to your computer and use it in GitHub Desktop.
Save derrickturk/c3b2a1a79d814f33a78c48334c429a82 to your computer and use it in GitHub Desktop.
pyodbc converter for SQL Server POINT, MULTIPOINT, and LINESTRING (incl XYZM) geometries and geographies
# Copyright (c) 2023 dwt | terminus, LLC
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import struct
import numpy as np
import numpy.typing as npt
import numpy.lib.stride_tricks as nplst
from typing import NamedTuple, TypeAlias
FLOAT_LE = np.dtype(float).newbyteorder('<')
FLAG_Z = 0x01
FLAG_M = 0x02
FLAG_V = 0x04
FLAG_P = 0x08
FLAG_L = 0x10
class PointXY(NamedTuple):
x: float
y: float
@property
def wkt(self) -> str:
return f'POINT({self.x} {self.y})'
def isowkb(self) -> bytes:
return struct.pack('<BIdd', 1, 1, self.x, self.y)
class PointXYZ(NamedTuple):
x: float
y: float
z: float
@property
def wkt(self) -> str:
return f'POINT({self.x} {self.y} {self.z})'
def isowkb(self) -> bytes:
return struct.pack('<BIddd', 1, 1001, self.x, self.y, self.z)
class PointXYZM(NamedTuple):
x: float
y: float
z: float
m: float
@property
def wkt(self) -> str:
return f'POINT({self.x} {self.y} {self.z} {self.m})'
def isowkb(self) -> bytes:
return struct.pack('<BIdddd', 1, 3001, self.x, self.y, self.z, self.m)
class PointXYM(NamedTuple):
x: float
y: float
m: float
@property
def wkt(self) -> str:
return f'POINT({self.x} {self.y} NULL {self.m})'
def isowkb(self) -> bytes:
return struct.pack('<BIddd', 1, 2001, self.x, self.y, self.m)
class PointSRIDXY(NamedTuple):
srid: int
xlat: float
ylon: float
@property
def wkt(self) -> str:
return f'POINT({self.xlat} {self.ylon})'
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return struct.pack('<BIdd', 1, 1, self.ylon, self.xlat)
else:
return struct.pack('<BIdd', 1, 1, self.xlat, self.ylon)
class PointSRIDXYZ(NamedTuple):
srid: int
xlat: float
ylon: float
z: float
@property
def wkt(self) -> str:
return f'POINT({self.xlat} {self.ylon} {self.z})'
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return struct.pack('<BIddd', 1, 1001, self.ylon, self.xlat, self.z)
else:
return struct.pack('<BIddd', 1, 1001, self.xlat, self.ylon, self.z)
class PointSRIDXYZM(NamedTuple):
srid: int
xlat: float
ylon: float
z: float
m: float
@property
def wkt(self) -> str:
return f'POINT({self.xlat} {self.ylon} {self.z} {self.m})'
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return struct.pack('<BIdddd', 1, 3001, self.ylon, self.xlat,
self.z, self.m)
else:
return struct.pack('<BIdddd', 1, 3001, self.xlat, self.ylon,
self.z, self.m)
class PointSRIDXYM(NamedTuple):
srid: int
xlat: float
ylon: float
m: float
@property
def wkt(self) -> str:
return f'POINT({self.xlat} {self.ylon} NULL {self.m})'
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return struct.pack('<BIddd', 1, 2001, self.ylon, self.xlat, self.m)
else:
return struct.pack('<BIddd', 1, 2001, self.xlat, self.ylon, self.m)
class LineStringXY(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'LINESTRING EMPTY'
contents = ', '.join(f'{x} {y}' for x, y in zip(self.x, self.y))
return f'LINESTRING ({contents})'
def isowkb(self) -> bytes:
hdr = struct.pack('<BII', 1, 2, self.x.size)
pts = np.empty((self.x.size + self.y.size,), dtype=FLOAT_LE)
pts[0::2] = self.x
pts[1::2] = self.y
return hdr + pts.tobytes()
class LineStringXYZ(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'LINESTRING EMPTY'
contents = ', '.join(f'{x} {y} {z if not np.isnan(z) else "NULL"}'
for x, y, z in zip(self.x, self.y, self.z))
return f'LINESTRING ({contents})'
def isowkb(self) -> bytes:
hdr = struct.pack('<BII', 1, 1002, self.x.size)
pts = np.empty((self.x.size + self.y.size + self.z.size,),
dtype=FLOAT_LE)
pts[0::3] = self.x
pts[1::3] = self.y
pts[2::3] = self.z
return hdr + pts.tobytes()
class LineStringXYZM(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'LINESTRING EMPTY'
contents = ', '.join(
f'{x} {y} {z if not np.isnan(z) else "NULL"} {m if not np.isnan(m) else "NULL"}'
for x, y, z, m in zip(self.x, self.y, self.z, self.m))
return f'LINESTRING ({contents})'
def isowkb(self) -> bytes:
hdr = struct.pack('<BII', 1, 3002, self.x.size)
pts = np.empty((self.x.size + self.y.size + self.z.size + self.m.size,),
dtype=FLOAT_LE)
pts[0::4] = self.x
pts[1::4] = self.y
pts[2::4] = self.z
pts[3::4] = self.m
return hdr + pts.tobytes()
class LineStringXYM(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'LINESTRING EMPTY'
contents = ', '.join(
f'{x} {y} NULL {m if not np.isnan(m) else "NULL"}'
for x, y, m in zip(self.x, self.y, self.m))
return f'LINESTRING ({contents})'
def isowkb(self) -> bytes:
hdr = struct.pack('<BII', 1, 2002, self.x.size)
pts = np.empty((self.x.size + self.y.size + self.m.size,),
dtype=FLOAT_LE)
pts[0::3] = self.x
pts[1::3] = self.y
pts[2::3] = self.m
return hdr + pts.tobytes()
class LineStringSRIDXY(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return LineStringXY(self.xlat, self.ylon).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return LineStringXY(self.ylon, self.xlat).isowkb()
else:
return LineStringXY(self.xlat, self.ylon).isowkb()
class LineStringSRIDXYZ(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return LineStringXYZ(self.xlat, self.ylon, self.z).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return LineStringXYZ(self.ylon, self.xlat, self.z).isowkb()
else:
return LineStringXYZ(self.xlat, self.ylon, self.z).isowkb()
class LineStringSRIDXYZM(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return LineStringXYZM(self.xlat, self.ylon, self.z, self.m).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return LineStringXYZM(self.ylon, self.xlat, self.z, self.m).isowkb()
else:
return LineStringXYZM(self.xlat, self.ylon, self.z, self.m).isowkb()
class LineStringSRIDXYM(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return LineStringXYM(self.xlat, self.ylon, self.m).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return LineStringXYM(self.ylon, self.xlat, self.m).isowkb()
else:
return LineStringXYM(self.xlat, self.ylon, self.m).isowkb()
class MultiPointXY(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'MULTIPOINT EMPTY'
contents = ', '.join(f'({x} {y})' for x, y in zip(self.x, self.y))
return f'MULTIPOINT ({contents})'
def isowkb(self) -> bytes:
buf = bytearray(struct.pack('<BII', 1, 4, self.x.size))
for x, y in zip(self.x, self.y):
buf.extend(PointXY(x, y).isowkb())
return bytes(buf)
class MultiPointXYZ(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'MULTIPOINT EMPTY'
contents = ', '.join(f'({x} {y} {z if not np.isnan(z) else "NULL"})'
for x, y, z in zip(self.x, self.y, self.z))
return f'MULTIPOINT ({contents})'
def isowkb(self) -> bytes:
buf = bytearray(struct.pack('<BII', 1, 1004, self.x.size))
for x, y, z in zip(self.x, self.y, self.z):
buf.extend(PointXYZ(x, y, z).isowkb())
return bytes(buf)
class MultiPointXYZM(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'MULTIPOINT EMPTY'
contents = ', '.join(
f'({x} {y} {z if not np.isnan(z) else "NULL"} {m if not np.isnan(m) else "NULL"})'
for x, y, z, m in zip(self.x, self.y, self.z, self.m))
return f'MULTIPOINT ({contents})'
def isowkb(self) -> bytes:
buf = bytearray(struct.pack('<BII', 1, 3004, self.x.size))
for x, y, z, m in zip(self.x, self.y, self.z, self.m):
buf.extend(PointXYZM(x, y, z, m).isowkb())
return bytes(buf)
class MultiPointXYM(NamedTuple):
x: npt.NDArray[np.float64]
y: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
if self.x.size == 0:
return 'MULTIPOINT EMPTY'
contents = ', '.join(
f'({x} {y} NULL {m if not np.isnan(m) else "NULL"})'
for x, y, m in zip(self.x, self.y, self.m))
return f'MULTIPOINT ({contents})'
def isowkb(self) -> bytes:
buf = bytearray(struct.pack('<BII', 1, 2004, self.x.size))
for x, y, m in zip(self.x, self.y, self.m):
buf.extend(PointXYZM(x, y, np.nan, m).isowkb())
return bytes(buf)
class MultiPointSRIDXY(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return MultiPointXY(self.xlat, self.ylon).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return MultiPointXY(self.ylon, self.xlat).isowkb()
else:
return MultiPointXY(self.xlat, self.ylon).isowkb()
class MultiPointSRIDXYZ(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return MultiPointXYZ(self.xlat, self.ylon, self.z).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return MultiPointXYZ(self.ylon, self.xlat, self.z).isowkb()
else:
return MultiPointXYZ(self.xlat, self.ylon, self.z).isowkb()
class MultiPointSRIDXYZM(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
z: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return MultiPointXYZM(self.xlat, self.ylon, self.z, self.m).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return MultiPointXYZM(self.ylon, self.xlat, self.z, self.m).isowkb()
else:
return MultiPointXYZM(self.xlat, self.ylon, self.z, self.m).isowkb()
class MultiPointSRIDXYM(NamedTuple):
srid: int
xlat: npt.NDArray[np.float64]
ylon: npt.NDArray[np.float64]
m: npt.NDArray[np.float64]
@property
def wkt(self) -> str:
return MultiPointXYM(self.xlat, self.ylon, self.m).wkt
def isowkb(self, latlon: bool = False) -> bytes:
if latlon:
return MultiPointXYM(self.ylon, self.xlat, self.m).isowkb()
else:
return MultiPointXYM(self.xlat, self.ylon, self.m).isowkb()
Geometry: TypeAlias = (
PointXY | PointXYZ | PointXYM | PointXYZM
| LineStringXY | LineStringXYZ | LineStringXYZM | LineStringXYM
| MultiPointXY | MultiPointXYZ | MultiPointXYZM | MultiPointXYM
)
Geography: TypeAlias = (
PointSRIDXY | PointSRIDXYZ | PointSRIDXYM | PointSRIDXYZM
| LineStringSRIDXY | LineStringSRIDXYZ | LineStringSRIDXYZM
| LineStringSRIDXYM
| MultiPointSRIDXY | MultiPointSRIDXYZ | MultiPointSRIDXYZM
| MultiPointSRIDXYM
)
def convert_151(binary: bytes) -> Geography | Geometry:
srid, ver, props = struct.unpack_from('<lBB', binary)
if ver not in (1, 2):
raise ValueError('probably not a geography or geometry')
if props & FLAG_P:
if props & FLAG_Z:
if props & FLAG_M:
x, y, z, m = struct.unpack_from(
'<4d', binary, offset=0x06)
if srid:
return PointSRIDXYZM(srid, x, y, z, m)
else:
return PointXYZM(x, y, z, m)
else:
x, y, z = struct.unpack_from('<3d', binary, offset=0x06)
if srid:
return PointSRIDXYZ(srid, x, y, z)
else:
return PointXYZ(x, y, z)
elif props & FLAG_M:
x, y, m = struct.unpack_from('<3d', binary, offset=0x06)
if srid:
return PointSRIDXYM(srid, x, y, m)
else:
return PointXYM(x, y, m)
else:
x, y = struct.unpack_from('<2d', binary, offset=0x06)
if srid:
return PointSRIDXY(srid, x, y)
else:
return PointXY(x, y)
elif props & FLAG_L:
n_pts = 2
offset = 0x06
else:
n_pts, = struct.unpack_from('<H', binary, offset=0x06)
offset = 0x0a
xy = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts * 2)
xs, ys = nplst.as_strided(xy, (2, n_pts), (8, 16))
offset += n_pts * 16
zs: npt.NDArray[np.float64] | None = None
if props & FLAG_Z:
zs = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts)
offset += n_pts * 8
ms: npt.NDArray[np.float64] | None = None
if props & FLAG_M:
ms = np.frombuffer(binary, dtype=FLOAT_LE, offset=offset, count=n_pts)
offset += n_pts * 8
is_multipoint = False
if not (props & FLAG_L):
n_fig, = struct.unpack_from('<L', binary, offset=offset)
offset += 4
for i in range(n_fig):
fig_attr, fig_off = struct.unpack_from('<Bl', binary, offset=offset)
offset += 5
if (ver == 1 and fig_attr != 1) or (ver == 2 and fig_attr not in (0, 1)):
raise ValueError('we only handle linestrings and multipoints')
if i != fig_off:
raise ValueError('we only handle "in-order" things')
n_shapes, = struct.unpack_from('<L', binary, offset=offset)
offset += 4
for i in range(n_shapes):
parent_off, fig_off, shape_type = struct.unpack_from(
'<llB', binary, offset=offset)
offset += 9
if i == 0:
if parent_off != -1 or fig_off not in (0, -1):
raise ValueError('we only handle "in-order" things')
if shape_type == 2:
if n_shapes != 1:
raise ValueError('we only handle single linestrings')
elif shape_type == 4:
is_multipoint = True
else:
raise ValueError(
'we only handle linestrings and multipoints')
elif is_multipoint and (fig_off not in (i - 1, -1) or shape_type != 1):
raise ValueError('we only handle "in-order" multipoints')
if offset != len(binary):
raise ValueError('probably not a geography or geometry')
# types ignored because mypy's too stupid to realize the match means certain
# values must be non-None
if not is_multipoint:
match srid, zs, ms:
case (0, None, None): return LineStringXY(xs, ys)
case (_, None, None): return LineStringSRIDXY(srid, xs, ys)
case (0, _, None): return LineStringXYZ(xs, ys, zs) # type: ignore
case (_, _, None): return LineStringSRIDXYZ(srid, xs, ys, zs) # type: ignore
case (0, None, _): return LineStringXYM(xs, ys, ms) # type: ignore
case (_, None, _): return LineStringSRIDXYM(srid, xs, ys, ms) # type: ignore
case (0, _, _): return LineStringXYZM(xs, ys, zs, ms) # type: ignore
case (_, _, _): return LineStringSRIDXYZM(srid, xs, ys, zs, ms) # type: ignore
case _: raise RuntimeError('unreachable')
else:
match srid, zs, ms:
case (0, None, None): return MultiPointXY(xs, ys)
case (_, None, None): return MultiPointSRIDXY(srid, xs, ys)
case (0, _, None): return MultiPointXYZ(xs, ys, zs) # type: ignore
case (_, _, None): return MultiPointSRIDXYZ(srid, xs, ys, zs) # type: ignore
case (0, None, _): return MultiPointXYM(xs, ys, ms) # type: ignore
case (_, None, _): return MultiPointSRIDXYM(srid, xs, ys, ms) # type: ignore
case (0, _, _): return MultiPointXYZM(xs, ys, zs, ms) # type: ignore
case (_, _, _): return MultiPointSRIDXYZM(srid, xs, ys, zs, ms) # type: ignore
case _: raise RuntimeError('unreachable')
__all__ = [
'PointXY',
'PointXYZ',
'PointXYZM',
'PointXYM',
'PointSRIDXY',
'PointSRIDXYZ',
'PointSRIDXYZM',
'PointSRIDXYM',
'LineStringXY',
'LineStringXYZ',
'LineStringXYZM',
'LineStringXYM',
'LineStringSRIDXY',
'LineStringSRIDXYZ',
'LineStringSRIDXYZM',
'LineStringSRIDXYM',
'MultiPointXY',
'MultiPointXYZ',
'MultiPointXYZM',
'MultiPointXYM',
'MultiPointSRIDXY',
'MultiPointSRIDXYZ',
'MultiPointSRIDXYZM',
'MultiPointSRIDXYM',
'Geometry',
'Geography',
'convert_151',
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment