Created
May 30, 2016 21:01
-
-
Save Fonsan/4a1d5e7ec2ad57bfed0c72cf51ea93c7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zlib | |
from imposm.parser.pbf.parser import PBFFile, PrimitiveBlockParser | |
import imposm.parser.pbf.parser | |
import imposm.parser.pbf | |
import StringIO | |
import cachetools | |
import bisect | |
import sys | |
import resource | |
import random | |
import gc | |
import multiprocessing | |
import functools | |
from collections import defaultdict | |
# def read_blob_data_no_open(filename, blob_pos, blob_size): | |
# if type(filename) is str: | |
# with open(filename, 'rb') as f: | |
# f.seek(blob_pos) | |
# blob_data = f.read(blob_size) | |
# else: | |
# f = filename | |
# f.seek(blob_pos) | |
# blob_data = f.read(blob_size) | |
# blob = OSMPBF.Blob() | |
# blob.ParseFromString(blob_data) | |
# raw_data = blob.raw | |
# if raw_data: | |
# return raw_data | |
# return zlib.decompress(blob.zlib_data) | |
# imposm.parser.pbf.parser.read_blob_data = read_blob_data_no_open | |
# class IOStr(str): | |
# @classmethod | |
# def from_file(cls, filename): | |
# io_str = IOStr(filename) | |
# with open(filename, 'rb') as read_file: | |
# io_str.io = StringIO.StringIO(read_file.read()) | |
# return io_str | |
# io = None | |
# def seek(self, *args): | |
# return self.io.seek(*args) | |
# def read(self, *args): | |
# return self.io.read(*args) | |
class PrimitiveBlock(object): | |
def __init__(self, primitive_block): | |
self.primitivegroup = map(PrimitiveGroup, primitive_block.primitivegroup) | |
self.granularity = primitive_block.granularity | |
self.lat_offset = primitive_block.lat_offset | |
self.lon_offset = primitive_block.lon_offset | |
class Dense(object): | |
def __init__(self, dense): | |
self.id = dense.id | |
self.lat = dense.lat | |
self.lon = dense.lon | |
self.keys_vals = dense.keys_vals | |
class Way(object): | |
def __init__(self, way): | |
self.keys = way.keys | |
self.vals = way.vals | |
self.refs = way.refs | |
class Ways(object): | |
def __init__(self, ways): | |
self.ways = map(Way, ways) | |
class PrimitiveGroup(object): | |
dense = None | |
ways = None | |
def __init__(self, group): | |
if group.dense: | |
self.dense = Dense(group.dense) | |
if group.ways: | |
self.ways = Ways(group.ways) | |
class SuperPrimitiveBlockParser(PrimitiveBlockParser): | |
def __init__(self, data): | |
primitive_block = imposm.parser.pbf.OSMPBF.PrimitiveBlock() | |
primitive_block.ParseFromString(data) | |
self.stringtable = imposm.parser.pbf.parser.decoded_stringtable(primitive_block.stringtable.s) | |
self.primitive_block = primitive_block | |
self.primitivegroup = self.primitive_block.primitivegroup | |
class BlobParserProcess(multiprocessing.Process): | |
def __init__(self, queue, blob_queue, *args, **kw): | |
multiprocessing.Process.__init__(self) | |
self.daemon = True | |
self.queue = queue | |
self.blob_queue = blob_queue | |
def run(self): | |
while True: | |
pos = self.queue.get() | |
sys.stdout.flush() | |
if pos is None: | |
self.queue.task_done() | |
self.blob_queue.put(None) | |
break | |
data = imposm.parser.pbf.parser.read_blob_data(pos['filename'], pos['blob_pos'], pos['blob_size']) | |
blob = SuperPrimitiveBlockParser(data) | |
self.blob_queue.put(blob) | |
self.queue.task_done() | |
class CoordCollector(object): | |
verbose = False | |
chunk_size = 8000 | |
coord_blobs = None | |
concurrency = 2 | |
def __init__(self, filename, max_ways=50000, max_coord_blobs=1000): | |
self.max_ways = max_ways | |
self.max_coord_blobs = max_coord_blobs | |
self.filename = filename | |
def log(self, msg): | |
if self.verbose: | |
sys.stderr.write("{}\t{mem:.1f}MB\n".format(msg, mem=resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576)) | |
sys.stderr.flush() | |
def nodes_by_id_from_blob(self, blob): | |
start, end, data = blob | |
nodes_by_id = {} | |
for osm_id, tags, coord in SuperPrimitiveBlockParser(data).nodes(): | |
nodes_by_id[osm_id] = coord | |
return nodes_by_id | |
def parse_file(self, coord_group_callback=lambda x: x, way_group_callback=lambda x: x): | |
pbf_file = PBFFile(self.filename) | |
for pos in pbf_file.blob_offsets(): | |
data = imposm.parser.pbf.parser.read_blob_data(pos['filename'], pos['blob_pos'], pos['blob_size']) | |
parser = SuperPrimitiveBlockParser(data) | |
groups = parser.primitivegroup | |
first_dense_group = next((group for group in groups if group.dense), None) | |
if first_dense_group: | |
last_dense_group = next(group for group in reversed(groups) if group.dense) | |
coord_group_callback((first_dense_group.dense.id[0], sum(last_dense_group.dense.id), data)) | |
else: | |
for group in groups: | |
if group.ways: | |
way_group_callback(data) | |
# for group in blob.primitivegroup: | |
# if group.dense: | |
# coord_group_callback((group.dense.id[0], sum(group.dense.id), group)) | |
# elif group.ways: | |
# way_group_callback(group) | |
# pool = [] | |
# queue = multiprocessing.JoinableQueue() | |
# blob_queue = multiprocessing.JoinableQueue() | |
# for _ in xrange(self.concurrency): | |
# proc = BlobParserProcess(queue, blob_queue) | |
# pool.append(proc) | |
# proc.start() | |
# for pos in pbf_file.blob_offsets(): | |
# queue.put(pos) | |
# for proc in pool: | |
# queue.put(None) | |
# queue.join() | |
# running = self.concurrency | |
# while True: | |
# blob = blob_queue.get() | |
# if blob is None: | |
# blob_queue.task_done() | |
# running -= 1 | |
# if running == 0: | |
# break | |
# else: | |
# continue | |
# for group in blob.primitivegroup: | |
# if group.dense: | |
# coord_group_callback((group.dense.id[0], sum(group.dense.id), group)) | |
# elif group.ways: | |
# way_group_callback(group) | |
# blob_queue.task_done() | |
# for proc in pool: | |
# proc.join() | |
# blob_queue.join() | |
def ref_deps(self, coord_groups, ways): | |
ends = list(blob[1] for i, blob in enumerate(coord_groups)) | |
deps = {key: [key] for key in range(len(coord_groups))} | |
appearances = defaultdict(int) | |
for osm_id, tags, refs in ways: | |
dep = set() | |
for j, ref in enumerate(refs): | |
index = bisect.bisect_left(ends, ref) | |
dep.add(index) | |
dep = list(dep) | |
all_deps = set() | |
for d in dep: | |
all_deps.update(deps[d]) | |
all_deps = list(all_deps) | |
for d in dep: | |
deps[d] = all_deps | |
r = set(tuple(v) for v in deps.values()) | |
self.log(r) | |
raise 'foo' | |
return deps | |
def drain_ways(self, ref_deps, blob_cache, ways): | |
for blob, nodes_by_id in blob_cache.iteritems(): | |
if blob in ref_deps: | |
items = ref_deps[blob] | |
for refs, j in items: | |
refs[j] = nodes_by_id[refs[j]] | |
del ref_deps[blob] | |
for blob, items in ref_deps.iteritems(): | |
self.log((blob[1])) | |
nodes_by_id = blob_cache[blob] | |
for refs, j in items: | |
refs[j] = nodes_by_id[refs[j]] | |
return ways | |
def blob_cache(self, coord_groups): | |
cache = cachetools.LRUCache(self.max_coord_blobs, missing=self.nodes_by_id_from_blob) | |
prewarm_blob_count = min(self.max_coord_blobs, len(coord_groups)) | |
for blob in random.sample(coord_groups, prewarm_blob_count): | |
cache[blob] | |
self.log('prewarmed cache, {} of {} blobs in memory'.format(prewarm_blob_count, len(coord_groups))) | |
return cache | |
def get_ways_from_data(self, data): | |
return list(way for way in SuperPrimitiveBlockParser(data).ways()) | |
def ways(self): | |
coord_groups = [] | |
way_groups = [] | |
self.log(len(list(PBFFile(self.filename).blob_offsets()))) | |
self.parse_file(coord_group_callback=coord_groups.append, way_group_callback=way_groups.append) | |
self.log((len(way_groups), len(coord_groups))) | |
self.log('indexed coords') | |
blob_cache = self.blob_cache(coord_groups) | |
ways = [] | |
for data in way_groups: | |
ways += self.get_ways_from_data(data) | |
if len(ways) + self.chunk_size > self.max_ways: | |
self.log('draining {} ways'.format(len(ways))) | |
for way in self.drain_ways(self.ref_deps(coord_groups, ways), blob_cache, ways): | |
yield way | |
ways = [] | |
self.log('Final draining {} ways'.format(len(ways))) | |
for way in self.drain_ways(self.ref_deps(coord_groups, ways), blob_cache, ways): | |
yield way | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment