Created
September 27, 2014 13:28
-
-
Save snorfalorpagus/8220ed1a981909ee51f9 to your computer and use it in GitHub Desktop.
Shapely and multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
A simple geoprocessing task: for each feature in layer1, find the distance to | |
all features in layer2 that are within 5 map units | |
''' | |
import fiona | |
from shapely.geometry import shape | |
from shapely.ops import nearest_points | |
import time | |
from geofunc import create_spatial_index | |
if __name__ == '__main__': | |
layer1 = fiona.open('layer1.shp', 'r') | |
layer2 = fiona.open('layer2.shp', 'r') | |
index = create_spatial_index(layer2) | |
t0 = time.time() | |
for fid1, feature1 in layer1.items(): | |
geometry1 = shape(feature1['geometry']) | |
distances = [] | |
for fid2 in index.intersection([a+b for a,b in zip(geometry1.bounds, [-5, -5, 5, 5])]): | |
feature2 = layer2[fid2] | |
geometry2 = shape(feature2['geometry']) | |
point1, point2 = nearest_points(geometry1, geometry2) | |
distance = point1.distance(point2) | |
distances.append(distance) | |
print time.time()-t0 | |
layer1.close() | |
layer2.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import rtree | |
def create_spatial_index(collection): | |
'''Create an Rtree spatial index of a Fiona layer | |
The index is serialized to a file. If the file already exists, it is loaded | |
instead of regenerated. | |
''' | |
index = None | |
if os.path.exists(collection.path+'.idx'): | |
try: | |
index = rtree.index.Rtree(collection.path) | |
except: | |
pass | |
if not index: | |
assert(len(collection) > 0) | |
def gen(collection): | |
for fid, feature in collection.items(): | |
geometry = shape(feature['geometry']) | |
yield((fid, geometry.bounds, None)) | |
index = rtree.index.Rtree(collection.path, gen(collection)) | |
index.close() # to prevent corruption | |
index = rtree.index.Rtree(collection.path) | |
return index |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
As basic.py, but refactored to use the multiprocessing module. | |
''' | |
import fiona | |
from shapely.geometry import shape | |
from shapely.ops import nearest_points | |
import time | |
from geofunc import create_spatial_index | |
import multiprocessing | |
def worker(queue1, queue2, path1, path2, *args, **kwargs): | |
# open the layers | |
layer1 = fiona.open(path1, 'r') | |
layer2 = fiona.open(path2, 'r') | |
# open the already created spatial index | |
index = create_spatial_index(layer2) | |
while True: | |
# get the next FID to process | |
item = queue1.get() | |
if item is None: | |
break | |
fid1 = item | |
# ---- | |
feature1 = layer1[fid1] | |
geometry1 = shape(feature1['geometry']) | |
distances = [] | |
for fid2 in index.intersection([a+b for a,b in zip(geometry1.bounds, [-5, -5, 5, 5])]): | |
feature2 = layer2[fid2] | |
geometry2 = shape(feature2['geometry']) | |
point1, point2 = nearest_points(geometry1, geometry2) | |
distance = point1.distance(point2) | |
distances.append(distance) | |
# ---- | |
queue2.put((fid1, distances)) | |
layer1.close() | |
layer2.close() | |
if __name__ == '__main__': | |
layer1 = fiona.open('layer1.shp', 'r') | |
layer2 = fiona.open('layer2.shp', 'r') | |
# create the spatial index | |
index = create_spatial_index(layer2) | |
# create queues for 2-way communication between master and workers | |
queue1 = multiprocessing.Queue() | |
queue2 = multiprocessing.Queue() | |
# create some worker processes | |
processes = [] | |
for n in range(0, multiprocessing.cpu_count()): | |
process = multiprocessing.Process(target=worker, args=(queue1, queue2, layer1.path, layer2.path)) | |
processes.append(process) | |
process.start() | |
t0 = time.time() | |
feature_count = len(layer1) | |
waiting = 0 | |
position = 0 | |
step = 100 | |
done = 0 | |
while True: | |
if waiting == 0: | |
# add some more FIDs to the TODO queue | |
fids = range(position, min(position+step, feature_count)) | |
for fid in fids: | |
queue1.put(fid) | |
position += step | |
waiting = len(fids) | |
# process a result | |
item = queue2.get() | |
waiting -= 1 | |
done += 1 | |
fid1, distances = item | |
if done == feature_count: | |
# all done, send message to workers to quit | |
for process in processes: | |
queue1.put(None) | |
break | |
print time.time()-t0 | |
# wait for the worker processes to quit | |
for process in processes: | |
process.join() | |
layer1.close() | |
layer2.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment