Skip to content

Instantly share code, notes, and snippets.

@snorfalorpagus
Created September 27, 2014 13:28
Show Gist options
  • Save snorfalorpagus/8220ed1a981909ee51f9 to your computer and use it in GitHub Desktop.
Save snorfalorpagus/8220ed1a981909ee51f9 to your computer and use it in GitHub Desktop.
Shapely and multiprocessing
#!/usr/bin/env python
'''
A simple geoprocessing task: for each feature in layer1, find the distance to
all features in layer2 that are within 5 map units
'''
import fiona
from shapely.geometry import shape
from shapely.ops import nearest_points
import time
from geofunc import create_spatial_index
if __name__ == '__main__':
layer1 = fiona.open('layer1.shp', 'r')
layer2 = fiona.open('layer2.shp', 'r')
index = create_spatial_index(layer2)
t0 = time.time()
for fid1, feature1 in layer1.items():
geometry1 = shape(feature1['geometry'])
distances = []
for fid2 in index.intersection([a+b for a,b in zip(geometry1.bounds, [-5, -5, 5, 5])]):
feature2 = layer2[fid2]
geometry2 = shape(feature2['geometry'])
point1, point2 = nearest_points(geometry1, geometry2)
distance = point1.distance(point2)
distances.append(distance)
print time.time()-t0
layer1.close()
layer2.close()
import os
import rtree
def create_spatial_index(collection):
'''Create an Rtree spatial index of a Fiona layer
The index is serialized to a file. If the file already exists, it is loaded
instead of regenerated.
'''
index = None
if os.path.exists(collection.path+'.idx'):
try:
index = rtree.index.Rtree(collection.path)
except:
pass
if not index:
assert(len(collection) > 0)
def gen(collection):
for fid, feature in collection.items():
geometry = shape(feature['geometry'])
yield((fid, geometry.bounds, None))
index = rtree.index.Rtree(collection.path, gen(collection))
index.close() # to prevent corruption
index = rtree.index.Rtree(collection.path)
return index
#!/usr/bin/env python
'''
As basic.py, but refactored to use the multiprocessing module.
'''
import fiona
from shapely.geometry import shape
from shapely.ops import nearest_points
import time
from geofunc import create_spatial_index
import multiprocessing
def worker(queue1, queue2, path1, path2, *args, **kwargs):
# open the layers
layer1 = fiona.open(path1, 'r')
layer2 = fiona.open(path2, 'r')
# open the already created spatial index
index = create_spatial_index(layer2)
while True:
# get the next FID to process
item = queue1.get()
if item is None:
break
fid1 = item
# ----
feature1 = layer1[fid1]
geometry1 = shape(feature1['geometry'])
distances = []
for fid2 in index.intersection([a+b for a,b in zip(geometry1.bounds, [-5, -5, 5, 5])]):
feature2 = layer2[fid2]
geometry2 = shape(feature2['geometry'])
point1, point2 = nearest_points(geometry1, geometry2)
distance = point1.distance(point2)
distances.append(distance)
# ----
queue2.put((fid1, distances))
layer1.close()
layer2.close()
if __name__ == '__main__':
layer1 = fiona.open('layer1.shp', 'r')
layer2 = fiona.open('layer2.shp', 'r')
# create the spatial index
index = create_spatial_index(layer2)
# create queues for 2-way communication between master and workers
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
# create some worker processes
processes = []
for n in range(0, multiprocessing.cpu_count()):
process = multiprocessing.Process(target=worker, args=(queue1, queue2, layer1.path, layer2.path))
processes.append(process)
process.start()
t0 = time.time()
feature_count = len(layer1)
waiting = 0
position = 0
step = 100
done = 0
while True:
if waiting == 0:
# add some more FIDs to the TODO queue
fids = range(position, min(position+step, feature_count))
for fid in fids:
queue1.put(fid)
position += step
waiting = len(fids)
# process a result
item = queue2.get()
waiting -= 1
done += 1
fid1, distances = item
if done == feature_count:
# all done, send message to workers to quit
for process in processes:
queue1.put(None)
break
print time.time()-t0
# wait for the worker processes to quit
for process in processes:
process.join()
layer1.close()
layer2.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment