Last active
January 15, 2020 17:29
-
-
Save hoffrocket/7b0a3d80a0d677c620ebb3e4a2fe571d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from zipfile import ZipFile | |
import gzip | |
from io import BytesIO | |
import io | |
import struct | |
zip_req = requests.get("http://download.geonames.org/export/zip/US.zip") | |
zip_bytes = BytesIO(zip_req.content) | |
us_zip = ZipFile(zip_bytes) | |
zip_tsv = us_zip.open("US.txt") | |
with io.open("zip_geo.db.gz", "wb") as out: | |
# set mtime to 0 so that output bytes only change if input changes | |
compressed = gzip.GzipFile(fileobj=out, mode="wb", mtime=0) | |
for line_bytes in zip_tsv.readlines(): | |
line = line_bytes.decode("utf-8") | |
# for the lines like: | |
# US\t80640\tHenderson\tColorado\tCO\tAdams\t001\t\t\t39.8983\t-104.8718\t4 | |
split_line = line.split("\t") | |
zip_code = split_line[1] | |
lat = split_line[9] | |
lng = split_line[10] | |
compressed.write(struct.pack("i", int(zip_code))) | |
compressed.write(struct.pack("f", float(lat))) | |
compressed.write(struct.pack("f", float(lng))) | |
compressed.flush() | |
compressed.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import io | |
import gzip | |
import struct | |
from dataclasses import dataclass | |
from typing import Optional, Any, List, Tuple | |
import collections | |
from math import sin, cos, sqrt, atan2, radians | |
# adjust this to the relative pathto the db file | |
_ZIP_GEO_DB_PATH = os.path.join(os.path.dirname(__file__), "../zip_geo.db.gz") | |
def square_distance(a, b): | |
s = 0 | |
for x, y in zip(a, b): | |
d = x - y | |
s += d * d | |
return s | |
Node = collections.namedtuple("Node", 'point axis label left right') | |
# code from http://code.activestate.com/recipes/577497-kd-tree-for-nearest-neighbor-search-in-a-k-dimensi/ | |
class KDTree(object): | |
"""A tree for nearest neighbor search in a k-dimensional space. | |
For information about the implementation, see | |
http://en.wikipedia.org/wiki/Kd-tree | |
Usage: | |
objects is an iterable of (point, label) tuples | |
k is the number of dimensions | |
t = KDTree(k, objects) | |
point, label = t.nearest_neighbor(destination) | |
""" | |
def __init__(self, k, objects): | |
def build_tree(objects: List[Tuple[List[float], Any]], axis=0): | |
if not objects: | |
return None | |
objects.sort(key=lambda o: o[0][axis]) | |
median_idx = len(objects) // 2 | |
median_point, median_label = objects[median_idx] | |
next_axis = (axis + 1) % k | |
return Node(median_point, axis, median_label, | |
build_tree(objects[:median_idx], next_axis), | |
build_tree(objects[median_idx + 1:], next_axis)) | |
self.root = build_tree(list(objects)) | |
def nearest_neighbor(self, destination): | |
best = [None, None, float('inf')] | |
# state of search: best point found, its label, lowest squared distance | |
def recursive_search(here): | |
if here is None: | |
return | |
point, axis, label, left, right = here | |
here_sd = square_distance(point, destination) | |
if here_sd < best[2]: | |
best[:] = point, label, here_sd | |
diff = destination[axis] - point[axis] | |
close, away = (left, right) if diff <= 0 else (right, left) | |
recursive_search(close) | |
if diff ** 2 < best[2]: | |
recursive_search(away) | |
recursive_search(self.root) | |
return best[0], best[1] | |
@dataclass(frozen=True) | |
class LatLng(): | |
latitude: float | |
longitude: float | |
def distance_cartesian(self, other: "LatLng") -> float: | |
return sqrt( | |
(self.latitude - other.latitude)**2 + (self.longitude - other.longitude)**2 | |
) | |
def distance_km(self, other: "LatLng") -> float: | |
"""haversine distance""" | |
R = 6373.0 | |
lat1 = radians(abs(self.latitude)) | |
lon1 = radians(abs(self.longitude)) | |
lat2 = radians(abs(other.latitude)) | |
lon2 = radians(abs(other.longitude)) | |
dlon = lon2 - lon1 | |
dlat = lat2 - lat1 | |
a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2 | |
c = 2 * atan2(sqrt(a), sqrt(1 - a)) | |
return R * c | |
def distance_mile(self, other: "LatLng") -> float: | |
return self.distance_km(other) / 1.609 | |
_zip_dict = {} | |
_kd_tree = None | |
with io.open(_ZIP_GEO_DB_PATH, "rb") as fileobj: | |
compressed = gzip.GzipFile(fileobj=fileobj, mode="rb") | |
while True: | |
zip_code_bytes = compressed.read(4) | |
if not zip_code_bytes: | |
break | |
zip_code = struct.unpack("i", zip_code_bytes)[0] | |
lat = struct.unpack("f", compressed.read(4))[0] | |
lng = struct.unpack("f", compressed.read(4))[0] | |
lat_lng = LatLng(lat, lng) | |
_zip_dict[zip_code] = lat_lng | |
points = [([ll.latitude, ll.longitude], zip_code) for zip_code, ll in _zip_dict.items()] | |
_kd_tree = KDTree(2, points) | |
def get_coordinates(zip_code: str) -> Optional[LatLng]: | |
try: | |
return _zip_dict.get(int(zip_code), None) | |
except TypeError: # wasn't passed a string | |
return None | |
except ValueError: # can't convert string to an int | |
return None | |
def nearest_zip_code(latitude, longitude) -> Tuple[LatLng, str]: | |
nearest_point, nearest_zip = _kd_tree.nearest_neighbor([latitude, longitude]) # type: ignore | |
return LatLng(nearest_point[0], nearest_point[1]), str(nearest_zip).zfill(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment