Skip to content

Instantly share code, notes, and snippets.

@mweisman
Forked from sgillies/pipelinedemo.py
Created September 23, 2011 15:11
Show Gist options
  • Save mweisman/1237605 to your computer and use it in GitHub Desktop.
Save mweisman/1237605 to your computer and use it in GitHub Desktop.
Data processing pipeline demo
# pipelinedemo.py
# Data processing pipeline demo
# Uses GeoJSON like geometries for demonstration only
from fiona import workspace
from shapely.geometry import box
from shapely import wkb
from json import dumps
import urllib2
import logging
UK_BOUNDS = box(-7.572168, 49.96, 1.681531, 58.635)
def coroutine(func):
"""Used as a decorator, advances a generator to active it"""
def start(*args,**kwargs):
cr = func(*args,**kwargs)
cr.next()
return cr
return start
def pipeline(source, following):
"""Gets features from an iterator and sends them along"""
for f in source:
logging.info("Pipeline: sending feature to %s", following)
following.send(f)
logging.info("Pipeline: source empty, closing %s", following)
following.close()
@coroutine
def task1(following):
"""Receives a feature, buffers it by 5 units and sends it along"""
logging.info("Task1: started...")
try:
while True:
f = (yield)
logging.info("Task1: buffering feature")
temp_geom = wkb.loads(f.wkb).buffer(50)
f.wkb = temp_geom.wkb
f.geometry = {
'type': temp_geom.geom_type,
'coordinates': list(temp_geom.exterior.coords)
}
logging.info("Task1: sending feature to %s", following)
following.send(f)
except GeneratorExit:
logging.info("Task1: stopped.")
@coroutine
def task2(following):
"""Receives a feature, determines if it overlaps The UK, sets a property and sends it along"""
logging.info("Task2: started...")
try:
while True:
f = (yield)
logging.info("Task2: searching for UK")
if wkb.loads(f.wkb).overlaps(UK_BOUNDS):
f.properties['UK'] = True
else:
f.properties['UK'] = False
logging.info("Task2: sending feature to %s", following)
following.send(f)
except GeneratorExit:
logging.info("Task2: stopped.")
@coroutine
def writer(fileobj):
"""A sink for features"""
buffer = []
try:
logging.info("Writer: started...")
while True:
f = (yield)
buffer.append({
'type': 'Feature',
'geometry': f.geometry,
'properties': f.properties
})
except GeneratorExit:
logging.info("Writer: dumping buffer")
fileobj.write(dumps(
{
'type': 'FeatureCollection',
'features': buffer
}
, indent=2) + "\n")
fileobj.close()
logging.info("Writer: stopped.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
w = workspace(urllib2.urlopen(
"http://geoipsum.org/polygons.json?bb=46.5606356,-24.5,56.8503839,20.5&polygon_number=30"
).read())
features = w.collections['OGRGeoJSON'].all
pipeline(
features,
task1(
task2(
writer(open("pipeline-demo.json", "w")))
)
)
print open("pipeline-demo.json").read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment