Skip to content

Instantly share code, notes, and snippets.

@hoffrocket
Last active January 15, 2020 17:29
Show Gist options
  • Save hoffrocket/7b0a3d80a0d677c620ebb3e4a2fe571d to your computer and use it in GitHub Desktop.
Save hoffrocket/7b0a3d80a0d677c620ebb3e4a2fe571d to your computer and use it in GitHub Desktop.
import requests
from zipfile import ZipFile
import gzip
from io import BytesIO
import io
import struct
zip_req = requests.get("http://download.geonames.org/export/zip/US.zip")
zip_bytes = BytesIO(zip_req.content)
us_zip = ZipFile(zip_bytes)
zip_tsv = us_zip.open("US.txt")
with io.open("zip_geo.db.gz", "wb") as out:
# set mtime to 0 so that output bytes only change if input changes
compressed = gzip.GzipFile(fileobj=out, mode="wb", mtime=0)
for line_bytes in zip_tsv.readlines():
line = line_bytes.decode("utf-8")
# for the lines like:
# US\t80640\tHenderson\tColorado\tCO\tAdams\t001\t\t\t39.8983\t-104.8718\t4
split_line = line.split("\t")
zip_code = split_line[1]
lat = split_line[9]
lng = split_line[10]
compressed.write(struct.pack("i", int(zip_code)))
compressed.write(struct.pack("f", float(lat)))
compressed.write(struct.pack("f", float(lng)))
compressed.flush()
compressed.close()
import os
import io
import gzip
import struct
from dataclasses import dataclass
from typing import Optional, Any, List, Tuple
import collections
from math import sin, cos, sqrt, atan2, radians
# adjust this to the relative pathto the db file
_ZIP_GEO_DB_PATH = os.path.join(os.path.dirname(__file__), "../zip_geo.db.gz")
def square_distance(a, b):
s = 0
for x, y in zip(a, b):
d = x - y
s += d * d
return s
Node = collections.namedtuple("Node", 'point axis label left right')
# code from http://code.activestate.com/recipes/577497-kd-tree-for-nearest-neighbor-search-in-a-k-dimensi/
class KDTree(object):
"""A tree for nearest neighbor search in a k-dimensional space.
For information about the implementation, see
http://en.wikipedia.org/wiki/Kd-tree
Usage:
objects is an iterable of (point, label) tuples
k is the number of dimensions
t = KDTree(k, objects)
point, label = t.nearest_neighbor(destination)
"""
def __init__(self, k, objects):
def build_tree(objects: List[Tuple[List[float], Any]], axis=0):
if not objects:
return None
objects.sort(key=lambda o: o[0][axis])
median_idx = len(objects) // 2
median_point, median_label = objects[median_idx]
next_axis = (axis + 1) % k
return Node(median_point, axis, median_label,
build_tree(objects[:median_idx], next_axis),
build_tree(objects[median_idx + 1:], next_axis))
self.root = build_tree(list(objects))
def nearest_neighbor(self, destination):
best = [None, None, float('inf')]
# state of search: best point found, its label, lowest squared distance
def recursive_search(here):
if here is None:
return
point, axis, label, left, right = here
here_sd = square_distance(point, destination)
if here_sd < best[2]:
best[:] = point, label, here_sd
diff = destination[axis] - point[axis]
close, away = (left, right) if diff <= 0 else (right, left)
recursive_search(close)
if diff ** 2 < best[2]:
recursive_search(away)
recursive_search(self.root)
return best[0], best[1]
@dataclass(frozen=True)
class LatLng():
latitude: float
longitude: float
def distance_cartesian(self, other: "LatLng") -> float:
return sqrt(
(self.latitude - other.latitude)**2 + (self.longitude - other.longitude)**2
)
def distance_km(self, other: "LatLng") -> float:
"""haversine distance"""
R = 6373.0
lat1 = radians(abs(self.latitude))
lon1 = radians(abs(self.longitude))
lat2 = radians(abs(other.latitude))
lon2 = radians(abs(other.longitude))
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
def distance_mile(self, other: "LatLng") -> float:
return self.distance_km(other) / 1.609
_zip_dict = {}
_kd_tree = None
with io.open(_ZIP_GEO_DB_PATH, "rb") as fileobj:
compressed = gzip.GzipFile(fileobj=fileobj, mode="rb")
while True:
zip_code_bytes = compressed.read(4)
if not zip_code_bytes:
break
zip_code = struct.unpack("i", zip_code_bytes)[0]
lat = struct.unpack("f", compressed.read(4))[0]
lng = struct.unpack("f", compressed.read(4))[0]
lat_lng = LatLng(lat, lng)
_zip_dict[zip_code] = lat_lng
points = [([ll.latitude, ll.longitude], zip_code) for zip_code, ll in _zip_dict.items()]
_kd_tree = KDTree(2, points)
def get_coordinates(zip_code: str) -> Optional[LatLng]:
try:
return _zip_dict.get(int(zip_code), None)
except TypeError: # wasn't passed a string
return None
except ValueError: # can't convert string to an int
return None
def nearest_zip_code(latitude, longitude) -> Tuple[LatLng, str]:
nearest_point, nearest_zip = _kd_tree.nearest_neighbor([latitude, longitude]) # type: ignore
return LatLng(nearest_point[0], nearest_point[1]), str(nearest_zip).zfill(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment