-
-
Save mweisman/1237605 to your computer and use it in GitHub Desktop.
Data processing pipeline demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pipelinedemo.py | |
# Data processing pipeline demo | |
# Uses GeoJSON like geometries for demonstration only | |
from fiona import workspace | |
from shapely.geometry import box | |
from shapely import wkb | |
from json import dumps | |
import urllib2 | |
import logging | |
UK_BOUNDS = box(-7.572168, 49.96, 1.681531, 58.635) | |
def coroutine(func): | |
"""Used as a decorator, advances a generator to active it""" | |
def start(*args,**kwargs): | |
cr = func(*args,**kwargs) | |
cr.next() | |
return cr | |
return start | |
def pipeline(source, following): | |
"""Gets features from an iterator and sends them along""" | |
for f in source: | |
logging.info("Pipeline: sending feature to %s", following) | |
following.send(f) | |
logging.info("Pipeline: source empty, closing %s", following) | |
following.close() | |
@coroutine | |
def task1(following): | |
"""Receives a feature, buffers it by 5 units and sends it along""" | |
logging.info("Task1: started...") | |
try: | |
while True: | |
f = (yield) | |
logging.info("Task1: buffering feature") | |
temp_geom = wkb.loads(f.wkb).buffer(50) | |
f.wkb = temp_geom.wkb | |
f.geometry = { | |
'type': temp_geom.geom_type, | |
'coordinates': list(temp_geom.exterior.coords) | |
} | |
logging.info("Task1: sending feature to %s", following) | |
following.send(f) | |
except GeneratorExit: | |
logging.info("Task1: stopped.") | |
@coroutine | |
def task2(following): | |
"""Receives a feature, determines if it overlaps The UK, sets a property and sends it along""" | |
logging.info("Task2: started...") | |
try: | |
while True: | |
f = (yield) | |
logging.info("Task2: searching for UK") | |
if wkb.loads(f.wkb).overlaps(UK_BOUNDS): | |
f.properties['UK'] = True | |
else: | |
f.properties['UK'] = False | |
logging.info("Task2: sending feature to %s", following) | |
following.send(f) | |
except GeneratorExit: | |
logging.info("Task2: stopped.") | |
@coroutine | |
def writer(fileobj): | |
"""A sink for features""" | |
buffer = [] | |
try: | |
logging.info("Writer: started...") | |
while True: | |
f = (yield) | |
buffer.append({ | |
'type': 'Feature', | |
'geometry': f.geometry, | |
'properties': f.properties | |
}) | |
except GeneratorExit: | |
logging.info("Writer: dumping buffer") | |
fileobj.write(dumps( | |
{ | |
'type': 'FeatureCollection', | |
'features': buffer | |
} | |
, indent=2) + "\n") | |
fileobj.close() | |
logging.info("Writer: stopped.") | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
w = workspace(urllib2.urlopen( | |
"http://geoipsum.org/polygons.json?bb=46.5606356,-24.5,56.8503839,20.5&polygon_number=30" | |
).read()) | |
features = w.collections['OGRGeoJSON'].all | |
pipeline( | |
features, | |
task1( | |
task2( | |
writer(open("pipeline-demo.json", "w"))) | |
) | |
) | |
print open("pipeline-demo.json").read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment