Skip to content

Instantly share code, notes, and snippets.

@rwat
Created October 22, 2024 02:04
Show Gist options
  • Save rwat/e98dcabadcaa4c89839f91f300ae580c to your computer and use it in GitHub Desktop.
Save rwat/e98dcabadcaa4c89839f91f300ae580c to your computer and use it in GitHub Desktop.
import datetime
import logging
import random
import lmdb
import msgpack
import numpy as np
WEST = 0
SOUTH = 1
EAST = 2
NORTH = 3
DB_NAME = 'sample_db'
MAP_SIZE = 1024 * 1024 * 1024 * 10 # 10 GB
kv_options = {
'buffers': True,
'map_async': True,
'meminit': False,
'metasync': False,
'sync': False,
'writemap': True,
}
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
logging.getLogger('').addHandler(console)
def kv_put(obj, k, v):
obj.put(msgpack.packb(k), msgpack.packb(v))
def kv_get(obj, k):
return msgpack.unpackb(obj.get(msgpack.packb(k)))
def benchmark(indexes, points):
# Average runtime about 4 seconds single-threaded on a 2019 Intel Mac. I think a multithreaded
# approach could get this between 1 and 2 seconds instead.
INT_TYPE = np.uint32
north_index, east_index, south_index, west_index = indexes
logger.info('Benchmark: start')
for i, point in enumerate(points):
start_time = datetime.datetime.now()
lon, lat = point
north_indices = north_index[north_index[:, 1] >= lat, 0].astype(INT_TYPE)
east_indices = east_index[east_index[:, 1] >= lon, 0].astype(INT_TYPE)
south_indices = south_index[south_index[:, 1] <= lat, 0].astype(INT_TYPE)
west_indices = west_index[west_index[:, 1] <= lon, 0].astype(INT_TYPE)
partial_result = np.intersect1d(north_indices, east_indices, assume_unique=True)
partial_result = np.intersect1d(partial_result, south_indices, assume_unique=True)
result = np.intersect1d(partial_result, west_indices, assume_unique=True)
# NOTE: the 'result' contains the source data indices for points that intersect the
# bounding box of source items, so you could retrieve the matching source data with
# kv_get(curs, result[i]) where 0 < i < len(result)
# Things like polygon lookup and polygon intersection could be done from there.
logger.info(f'bbox search {i} took {datetime.datetime.now() - start_time} seconds')
logger.info('Benchmark: done')
def create_user_input(num_points):
points = []
logger.info('Creating fake user data')
for _ in range(num_points):
lon = (random.random() * 180) - 90
lat = (random.random() * 360) - 180
points.append((lon, lat))
return points
def create_directional_indexes(num_items):
'''Iterate through source data kv pairs and extract bounding box info for later index search
'''
# index short axis is [source_data_key, associated_direction_value]
# NOTE: the largest int that can be represented with float32 is 16,777,217 (224 + 1). Therefore,
# we're going to use float64. The first element of each item is going to be cast to int from
# float64; sorry, janky for now.
# Casting from float32 to float64 will sometimes cause repeating decimal numbers that don't
# need to be there for typically float32 longitude and latitude, but we're gonna write this
# in one night : )
north_index = np.empty((num_items, 2), dtype=np.float64)
east_index = np.empty((num_items, 2), dtype=np.float64)
south_index = np.empty((num_items, 2), dtype=np.float64)
west_index = np.empty((num_items, 2), dtype=np.float64)
# Read source data and create indexes for each of the bounding box bounds
kv = lmdb.open(DB_NAME, map_size=MAP_SIZE)
with kv.begin(buffers=True) as txn:
with txn.cursor() as curs:
for k in range(num_items):
item = kv_get(curs, k)
# upper latitude bboxes bounds
north_index[k][0] = k
north_index[k][1] = item['bbox'][NORTH]
# upper longitude bboxes bounds
east_index[k][0] = k
east_index[k][1] = item['bbox'][EAST]
# lower latitude bboxes bounds
south_index[k][0] = k
south_index[k][1] = item['bbox'][SOUTH]
# lower longitude bboxes bounds
west_index[k][0] = k
west_index[k][1] = item['bbox'][WEST]
# log every one million items
if k % int(1e6) == 0:
logger.info(f'{k} items read and processed from the database')
# Sort indexes by their respective bounds values
def sort_bounds_index(a):
return a[a[:, 1].argsort()]
# At sixteen million items each, each array is ~ 244MB or total of 1GB
# Takes about 16 seconds on a 2019 Intel Mac
logger.info('Sorting indexes')
north_index = sort_bounds_index(north_index)
east_index = sort_bounds_index(east_index)
south_index = sort_bounds_index(south_index)
west_index = sort_bounds_index(west_index)
return north_index, east_index, south_index, west_index
def generate_random_bbox():
north = (random.random() * 180) - 90
east = (random.random() * 360) - 180
south = (random.random() * (north + 90) - 90) # ensures south < north
west = (random.random() * (east + 180) - 180) # ensures west < east
# return the bounding box in CMR search order of ll, ur in (lon, lat) order
return (west, south, east, north)
def create_source_data(num_items):
logger.info('Placing generated items in database.')
kv = lmdb.open(DB_NAME, map_size=MAP_SIZE)
with kv.begin(write=True, buffers=True) as txn:
with txn.cursor() as curs:
# Note: the 'k' is both the key for source data lookups and also the foreign key, of a
# sort, used by the directional indexes later.
for k in range(num_items):
bbox = generate_random_bbox()
fake_description = 'String data, maybe include a path pointing to original data.'
item = {
'bbox': bbox,
'description': fake_description
# Could optionally have the whole polygon in here too and test intersection
# with shapely.
}
kv_put(curs, k, item)
# log every one million items
if k % int(1e6) == 0:
logger.info(f'{k} items entered into the database')
logger.info(f'{num_items} items entered into the database')
if __name__ == '__main__':
num_items = int(1e6 * 16) # sixteen million items
num_points = 100 # one hundred points
# Run once and then disable
#create_source_data(num_items)
# index each of the four directions using the source data key; (north, east, south, west)
# NOTE: if source data doesn't change then these can be written as npz files to storage
indexes = create_directional_indexes(num_items)
# generate points for fake user input
points = create_user_input(num_points)
# benchmark bounding box tests across fake input
benchmark(indexes, points)
# FUTURE IMPROVEMENTS
# - of course: docstrings, comments, typing
# - write msgpack codecs to minimize data size (e.g. latitude and longitude should be np.float32)
# - use numpy structs instead of abusing floats into ints
# - lmdb faster reads/writes by splitting up bbox data, link/'description' data, polygon data, etc.
# into separate keyspace
# - definitely write direction index arrays to disk after generation
# - point search could be multithreaded ftw
# - it'd be easy to reduce the amount of memory used
# A way to test for polygon intersection; current code only tests for bbox intersection for now.
#from shapely.geometry import Polygon
# p1 = Polygon([(0,0), (1,1), (1,0)])
# p2 = Polygon([(0,1), (1,0), (1,1)])
# print(p1.intersects(p2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment