Created
October 22, 2024 02:04
-
-
Save rwat/e98dcabadcaa4c89839f91f300ae580c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import logging | |
import random | |
import lmdb | |
import msgpack | |
import numpy as np | |
WEST = 0 | |
SOUTH = 1 | |
EAST = 2 | |
NORTH = 3 | |
DB_NAME = 'sample_db' | |
MAP_SIZE = 1024 * 1024 * 1024 * 10 # 10 GB | |
kv_options = { | |
'buffers': True, | |
'map_async': True, | |
'meminit': False, | |
'metasync': False, | |
'sync': False, | |
'writemap': True, | |
} | |
logger = logging.getLogger('') | |
logger.setLevel(logging.DEBUG) | |
console = logging.StreamHandler() | |
console.setLevel(logging.DEBUG) | |
logging.getLogger('').addHandler(console) | |
def kv_put(obj, k, v): | |
obj.put(msgpack.packb(k), msgpack.packb(v)) | |
def kv_get(obj, k): | |
return msgpack.unpackb(obj.get(msgpack.packb(k))) | |
def benchmark(indexes, points): | |
# Average runtime about 4 seconds single-threaded on a 2019 Intel Mac. I think a multithreaded | |
# approach could get this between 1 and 2 seconds instead. | |
INT_TYPE = np.uint32 | |
north_index, east_index, south_index, west_index = indexes | |
logger.info('Benchmark: start') | |
for i, point in enumerate(points): | |
start_time = datetime.datetime.now() | |
lon, lat = point | |
north_indices = north_index[north_index[:, 1] >= lat, 0].astype(INT_TYPE) | |
east_indices = east_index[east_index[:, 1] >= lon, 0].astype(INT_TYPE) | |
south_indices = south_index[south_index[:, 1] <= lat, 0].astype(INT_TYPE) | |
west_indices = west_index[west_index[:, 1] <= lon, 0].astype(INT_TYPE) | |
partial_result = np.intersect1d(north_indices, east_indices, assume_unique=True) | |
partial_result = np.intersect1d(partial_result, south_indices, assume_unique=True) | |
result = np.intersect1d(partial_result, west_indices, assume_unique=True) | |
# NOTE: the 'result' contains the source data indices for points that intersect the | |
# bounding box of source items, so you could retrieve the matching source data with | |
# kv_get(curs, result[i]) where 0 < i < len(result) | |
# Things like polygon lookup and polygon intersection could be done from there. | |
logger.info(f'bbox search {i} took {datetime.datetime.now() - start_time} seconds') | |
logger.info('Benchmark: done') | |
def create_user_input(num_points): | |
points = [] | |
logger.info('Creating fake user data') | |
for _ in range(num_points): | |
lon = (random.random() * 180) - 90 | |
lat = (random.random() * 360) - 180 | |
points.append((lon, lat)) | |
return points | |
def create_directional_indexes(num_items): | |
'''Iterate through source data kv pairs and extract bounding box info for later index search | |
''' | |
# index short axis is [source_data_key, associated_direction_value] | |
# NOTE: the largest int that can be represented with float32 is 16,777,217 (224 + 1). Therefore, | |
# we're going to use float64. The first element of each item is going to be cast to int from | |
# float64; sorry, janky for now. | |
# Casting from float32 to float64 will sometimes cause repeating decimal numbers that don't | |
# need to be there for typically float32 longitude and latitude, but we're gonna write this | |
# in one night : ) | |
north_index = np.empty((num_items, 2), dtype=np.float64) | |
east_index = np.empty((num_items, 2), dtype=np.float64) | |
south_index = np.empty((num_items, 2), dtype=np.float64) | |
west_index = np.empty((num_items, 2), dtype=np.float64) | |
# Read source data and create indexes for each of the bounding box bounds | |
kv = lmdb.open(DB_NAME, map_size=MAP_SIZE) | |
with kv.begin(buffers=True) as txn: | |
with txn.cursor() as curs: | |
for k in range(num_items): | |
item = kv_get(curs, k) | |
# upper latitude bboxes bounds | |
north_index[k][0] = k | |
north_index[k][1] = item['bbox'][NORTH] | |
# upper longitude bboxes bounds | |
east_index[k][0] = k | |
east_index[k][1] = item['bbox'][EAST] | |
# lower latitude bboxes bounds | |
south_index[k][0] = k | |
south_index[k][1] = item['bbox'][SOUTH] | |
# lower longitude bboxes bounds | |
west_index[k][0] = k | |
west_index[k][1] = item['bbox'][WEST] | |
# log every one million items | |
if k % int(1e6) == 0: | |
logger.info(f'{k} items read and processed from the database') | |
# Sort indexes by their respective bounds values | |
def sort_bounds_index(a): | |
return a[a[:, 1].argsort()] | |
# At sixteen million items each, each array is ~ 244MB or total of 1GB | |
# Takes about 16 seconds on a 2019 Intel Mac | |
logger.info('Sorting indexes') | |
north_index = sort_bounds_index(north_index) | |
east_index = sort_bounds_index(east_index) | |
south_index = sort_bounds_index(south_index) | |
west_index = sort_bounds_index(west_index) | |
return north_index, east_index, south_index, west_index | |
def generate_random_bbox(): | |
north = (random.random() * 180) - 90 | |
east = (random.random() * 360) - 180 | |
south = (random.random() * (north + 90) - 90) # ensures south < north | |
west = (random.random() * (east + 180) - 180) # ensures west < east | |
# return the bounding box in CMR search order of ll, ur in (lon, lat) order | |
return (west, south, east, north) | |
def create_source_data(num_items): | |
logger.info('Placing generated items in database.') | |
kv = lmdb.open(DB_NAME, map_size=MAP_SIZE) | |
with kv.begin(write=True, buffers=True) as txn: | |
with txn.cursor() as curs: | |
# Note: the 'k' is both the key for source data lookups and also the foreign key, of a | |
# sort, used by the directional indexes later. | |
for k in range(num_items): | |
bbox = generate_random_bbox() | |
fake_description = 'String data, maybe include a path pointing to original data.' | |
item = { | |
'bbox': bbox, | |
'description': fake_description | |
# Could optionally have the whole polygon in here too and test intersection | |
# with shapely. | |
} | |
kv_put(curs, k, item) | |
# log every one million items | |
if k % int(1e6) == 0: | |
logger.info(f'{k} items entered into the database') | |
logger.info(f'{num_items} items entered into the database') | |
if __name__ == '__main__': | |
num_items = int(1e6 * 16) # sixteen million items | |
num_points = 100 # one hundred points | |
# Run once and then disable | |
#create_source_data(num_items) | |
# index each of the four directions using the source data key; (north, east, south, west) | |
# NOTE: if source data doesn't change then these can be written as npz files to storage | |
indexes = create_directional_indexes(num_items) | |
# generate points for fake user input | |
points = create_user_input(num_points) | |
# benchmark bounding box tests across fake input | |
benchmark(indexes, points) | |
# FUTURE IMPROVEMENTS | |
# - of course: docstrings, comments, typing | |
# - write msgpack codecs to minimize data size (e.g. latitude and longitude should be np.float32) | |
# - use numpy structs instead of abusing floats into ints | |
# - lmdb faster reads/writes by splitting up bbox data, link/'description' data, polygon data, etc. | |
# into separate keyspace | |
# - definitely write direction index arrays to disk after generation | |
# - point search could be multithreaded ftw | |
# - it'd be easy to reduce the amount of memory used | |
# A way to test for polygon intersection; current code only tests for bbox intersection for now. | |
#from shapely.geometry import Polygon | |
# p1 = Polygon([(0,0), (1,1), (1,0)]) | |
# p2 = Polygon([(0,1), (1,0), (1,1)]) | |
# print(p1.intersects(p2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment