Skip to content

Instantly share code, notes, and snippets.

@davidvhill
Last active March 16, 2018 20:23
Show Gist options
  • Save davidvhill/e87435ed9c32cf143f4925275bb5191a to your computer and use it in GitHub Desktop.
Save davidvhill/e87435ed9c32cf143f4925275bb5191a to your computer and use it in GitHub Desktop.
A Function I Am Proud Of
@entrypoint.command()
@click.option('--x', '-x', required=True)
@click.option('--y', '-y', required=True)
@click.option('--acquired', '-a', required=True)
@click.option('--number', '-n', required=False, default=2500)
def changedetection(x, y, acquired, number=2500):
"""Run change detection for a tile over a time range and save results to Cassandra.
Args:
x (int): tile x coordinate
y (int): tile y coordinate
acquired (str): ISO8601 date range
number (int): Number of chips to run change detection on. Testing only.
Returns:
count of saved segments
"""
ctx = None
name = 'change-detection'
try:
# start and/or connect Spark
ctx = firebird.context(name)
# get logger
log = logger(ctx, name)
# wire everything up
tile = grids.tile(x=x, y=y, cfg=ARD)
ids = timeseries.ids(ctx=ctx, chips=take(number, tile.get('chips')))
ard = timeseries.rdd(ctx=ctx, ids=ids, acquired=acquired, cfg=firebird.ARD, name='ard')
ccd = pyccd.dataframe(ctx=ctx, rdd=pyccd.rdd(ctx=ctx, timeseries=ard)).cache()
# emit parameters
log.info(str(merge(tile, {'acquired': acquired,
'input-partitions': firebird.INPUT_PARTITIONS,
'product-partitions': firebird.PRODUCT_PARTITIONS,
'chips': ids.count()})))
# realize data transformations
cassandra.write(ctx, ccd, cqlstr(pyccd.algorithm()))
# log and return segment counts
return do(log.info "saved {} ccd segments".format(get('ccd', counts)))
except Exception as e:
# spark errors & stack trace
print('error:{}'.format(e))
traceback.print_exc()
finally:
# stop and/or disconnect Spark
if ctx is not None:
ctx.stop()
ctx = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment