Last active
August 29, 2015 14:15
-
-
Save sixy6e/cd7964d52928e22a056e to your computer and use it in GitHub Desktop.
Water body characterization across time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
from os.path import join as pjoin, dirname, basename | |
import cPickle as pickle | |
import argparse | |
import numpy | |
import luigi | |
import pandas | |
from datetime import datetime as dt | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
def combine_all_cells(): | |
# Config params | |
id_name = CONFIG.get('work', 'feature_name') | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
base_out_fname = CONFIG.get('outputs', 'final_output_filename') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
combined_cell_wc_fname = CONFIG.get('outputs', | |
'combined_cell_wc_filename') | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
ngroups = int(CONFIG.get('internals', 'pandas_groups')) | |
chunksize = int(CONFIG.get('internals', 'pandas_chunksize')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
headings = [id_name, "Timestamp", "Total_Pixel_Count", "WATER_NOT_PRESENT", | |
"NO_DATA", "MASKED_NO_CONTIGUITY", | |
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW", | |
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW", | |
"MASKED_CLOUD", "WATER_PRESENT"] | |
items = ['Timestamp', id_name] | |
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5') | |
tmp1_store = pandas.HDFStore(tmp1_fname) | |
# TODO Need to remove the index column before we write the first | |
# combined file | |
for cell in cells: | |
cell_dir = pjoin(base_out_dir, cell) | |
cell_wc_fname = pjoin(cell_dir, combined_cell_wc_fname) | |
# Open the current cells WC result file | |
store = pandas.HDFStore(cell_wc_fname, 'r') | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
df = store['data'] | |
df.drop('index', 1, inplace=True) | |
tmp1_store.append('data', df, index=False) | |
tmp1_store.flush() | |
store.close() | |
tmp1_store.close() | |
# Another method | |
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765 | |
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314 | |
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314 | |
# We need to group as many records we can into more memory manageable | |
# chunks, that also balance well for I/O | |
tmp1_store = pandas.HDFStore(tmp1_fname) | |
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5') | |
tmp2_store = pandas.HDFStore(tmp2_fname) | |
for chunk in tmp1_store.select('data', chunksize=chunksize): | |
g = chunk.groupby(chunk[id_name] % ngroups) | |
for grp, grouped in g: | |
tmp2_store.append('group_{}'.format(int(grp)), grouped, | |
data_columns=headings, index=False) | |
tmp2_store.flush() | |
tmp1_store.close() | |
tmp2_store.close() | |
# Define the output file | |
out_fname = pjoin(base_out_dir, base_out_fname) | |
combined_store = pandas.HDFStore(out_fname, 'w') | |
# Now read the grouped data and write to disk | |
tmp2_store = pandas.HDFStore(tmp2_fname) | |
for key in tmp2_store.keys(): | |
df = tmp2_store[key].groupby(items, as_index=False).sum() | |
combined_store.append('data', df, data_columns=headings) | |
combined_store.flush() | |
# Add metadata | |
metadata_group = '/Metadata' | |
metadata = {'Vector Filename': basename(vector_fname)} | |
metadata = pandas.DataFrame(metadata, index=[0]) | |
combined_store[metadata_group] = metadata | |
# Save and close the file | |
combined_store.close() | |
tmp1_store.close() | |
tmp2_store.close() | |
# Clean up temporary files | |
os.remove(tmp1_fname) | |
os.remove(tmp2_fname) | |
if __name__ == '__main__': | |
combine_all_cells() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[work] | |
output_directory = /g/data/v10/testing_ground/jps547/water_characterisation/FullRun2 | |
logs_directory = %(output_directory)s/logs | |
vector_filename = /g/data/v10/testing_ground/jps547/water_characterisation/Waterbodies.shp | |
feature_name = AUSHYDRO_ID | |
[internals] | |
envelope = 0 | |
pattern = *.tif | |
cells_per_node = 10 | |
pandas_groups = 10 | |
pandas_chunksize = 1000000 | |
water_directory = /g/data/u46/wofs/water_f7s/extents | |
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp | |
[outputs] | |
cells_list = cells_to_process.pkl | |
tiles_list = tiles.pkl | |
pbs_filename = run_wc_pbsdsh.bash | |
water_files_list = water_files.pkl | |
water_characterisation_out_format = wc_result_{}.h5 | |
combined_cell_wc_filename = combined_cell_wc_results.h5 | |
rasterise_filename = rasterised_result.tif | |
final_output_filename = water_characterisation_results.h5 | |
groups_filename_format = tmp_group_{}.h5 | |
#[core] | |
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg | |
[pbs] | |
project = v10 | |
queue = normal | |
walltime = 05:00:00 | |
email = [email protected] | |
modules = python/2.7.6,gdal/1.10.1,agdc-api,pandas,IDL_functions/0.2,gaip/test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[loggers] | |
keys=root | |
[handlers] | |
keys=consoleHandler | |
[formatters] | |
keys=simpleFormatter | |
[logger_root] | |
level=ERROR | |
handlers=consoleHandler | |
[handler_consoleHandler] | |
class=StreamHandler | |
level=ERROR | |
formatter=simpleFormatter | |
args=(sys.stderr,) | |
[formatter_simpleFormatter] | |
format=%(levelname)s: %(message)s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import glob | |
import sys | |
import logging | |
import argparse | |
# Debugging | |
import datetime | |
import pdb | |
import numpy | |
from osgeo import gdal | |
from osgeo import ogr | |
import pandas | |
# ga-neo-nfrip repo | |
from WaterExtent import WaterExtent | |
from fileSystem import Directory | |
# IDL_functions repo | |
from IDL_functions import histogram | |
# image_processing repo | |
from image_processing.segmentation.rasterise import Rasterise | |
from image_processing.segmentation.segmentation import SegmentVisitor | |
def getFiles(path, pattern): | |
""" | |
Just an internal function to find files given an file extension. | |
This isn't really designed to go beyond development demonstration | |
for this analytical workflow. | |
""" | |
# Get the current directory so we can change back later | |
CWD = os.getcwd() | |
os.chdir(path) | |
# Find the files matching the pattern | |
files = glob.glob(pattern) | |
files = [os.path.abspath(i) for i in files] | |
# Change back to the original directory | |
os.chdir(CWD) | |
return files | |
def getWaterExtents(file_list, sort=True): | |
""" | |
Given a list of water extent image files, create a list of | |
waterExtent class objects, and if sort by time, old -> new, if | |
sort=True. | |
:param file_list: | |
A list containing filepath names to water extent image files. | |
:param sort: | |
A boolean keyword indicating if the waterExtent objects | |
should be sorted before they're returned. Default is True. | |
:return: | |
A list of waterExtent class objects, one for every water | |
image file in file_list, and optionally sorted by date, | |
old -> new. | |
""" | |
waterExtents = [] | |
cellId = None | |
for f in file_list: | |
waterExtent = WaterExtent(f) | |
# check for lon, lat consistency | |
if cellId: | |
thisCellId = [waterExtent.lon, waterExtent.lat] | |
if thisCellId != cellId: | |
msg = ("Extents must be from same cell. At file {} got {}, " | |
"expecting {}") | |
msg = msg.format(f, thisCellId, cellId) | |
logging.error(msg) | |
sys.exit(1) | |
else: | |
cellId = [waterExtent.lon, waterExtent.lat] | |
waterExtents.append(waterExtent) | |
if sort: | |
# all good, so now sort the extents by datetime | |
logging.info("Collected %d files. Now sort them." % len(file_list)) | |
sortedWaterExtents = sorted(waterExtents, key=lambda extent: extent.getDatetime()) | |
return (sortedWaterExtents, cellId) | |
else: | |
msg = "Collected {} files. Sorting not applied.".format(len(file_list)) | |
logging.info(msg) | |
return (waterExtents, cellId) | |
# Define the main process that will work in a tiling fashion, i.e. process a | |
# chunk at a time rather than all at once | |
# Alternatively we do this through luigi | |
def tiled_main(vector_file, cell_list, indir, outdir, pattern, logpath): | |
""" | |
""" | |
# setup logging file ... log to <outputPath>/../logs/createWaterExtent_<hostname>_pid.log | |
log_file = "waterExtentVectorSummary_{}_{}.log".format(os.uname()[1], | |
os.getpid()) | |
logPath = os.path.join(logpath, log_file) | |
logging.basicConfig(filename=logPath, | |
format='%(asctime)s %(levelname)s: %(message)s', | |
datefmt='%d/%m/%Y %H:%M:%S', level=logging.INFO) | |
baseOutputDir = Directory(outdir) | |
if not baseOutputDir.exists(): | |
logging.error("%s does not exist" % baseOutputDir.getPath()) | |
sys.exit(1) | |
logging.info("Opening vector file %s" %vector_file) | |
vec_ds = ogr.Open(vector_file) | |
layer = vec_ds.GetLayer() | |
# Initialise dicts to hold feature names, and hydro_id | |
feature_names = {} | |
hydro_id = {} | |
# Dicts to hold forward and backward mapping of fid's and seg id's | |
seg2fid = {} | |
fid2seg = {} | |
fid_list = [] | |
fid_df = {} | |
logging.info("Gathering attribute information for each feature.") | |
# These Field Id's are unique to NGIG's vector datasets | |
for feature in layer: | |
fid = feature.GetFID() | |
feature_names[fid] = feature.GetField("NAME") | |
hydro_id[fid] = feature.GetField("AUSHYDRO_I") | |
seg2fid[fid+1] = fid | |
fid2seg[fid] = fid + 1 | |
fid_list.append(fid) | |
fid_df[fid] = pandas.DataFrame() | |
# Initialise the dataframe to store the results | |
df = pandas.DataFrame() | |
df['FID'] = fid_list | |
nfeatures = len(fid_list) | |
min_fid = df['FID'].min() | |
max_fid = df['FID'].max() | |
# We offset the min and max fid's by 1 as the rasterisation will be | |
# created that way. The array of shape (10) is arbitrary | |
h = histogram(numpy.zeros((10), dtype='int32'), Max=max_fid+1, | |
Min=min_fid+1) | |
# This will be used as the input keyword and changes will be made in place | |
t_area = h['histogram'] | |
# Create an output file that we can continually append data | |
store = pandas.HDFStore(os.path.join(outdir, 'Test_Results.h5')) | |
for cell in cell_list: | |
logging.info("Processing Cell ID: {}".format(cell)) | |
celldir = os.path.join(indir, cell) | |
# Process the current tile/chunk which in this case is a datacube cell | |
result_df = tiled_processing(vector_file, t_area, min_fid, max_fid, | |
celldir, pattern) | |
# We don't need to define columns up front | |
# We can define an empty dataframe and append the data | |
# That way columnss can be defined within the script | |
for key in result_df: | |
# Group names shouldn't start with a number | |
group_name = "FID_{}".format(key) | |
store.append(group_name, result_df[key]) | |
# Combine FIDs with identical timestamps and sum the pixel counts | |
# Including the hydro_id and fid as groupby's should exclude them from | |
# the summation. | |
# The filename and Feature Name fields will be removed as a result of the | |
# summation. Feature Name could potentially be kept | |
group_items = ['Time Stamp', 'AUSHYDRO_ID', 'FID'] | |
for key in store.keys(): | |
store[key] = store[key].groupby(group_items).sum() | |
# Save and close the file | |
store.close() | |
def tiled_processing(vector_file, input_hist, Min_id, Max_id, indir, pattern): | |
""" | |
The main processing routine. | |
:param indir: | |
A string containing the file system pathname to a directory | |
containing the water extent image files. | |
:param outdir: | |
A string containing the file system pathname to a directory | |
that will contain the result output. | |
:param logpath: | |
A string containing the file system pathname to a directory | |
that will contain the operation system logging information. | |
:param pattern: | |
A string containing the image extents file extension pattern, | |
eg '*.tif'. | |
:param vector_file: | |
A string containing the file system pathname to an OGR | |
A string containing the file system pathname to an OGR | |
compatible vector file. | |
:param outfname): | |
A string containing the ststem file pathname for the output | |
csv file. | |
:return: | |
Nothing, main() acts as a procedure. | |
""" | |
# Get a list of water_extent files | |
files = getFiles(indir, pattern) | |
# Get the water_extent objects and sort them by date | |
sortedWaterExtents, cellId = getWaterExtents(files) | |
# lat and lon will be helpful | |
lon = cellId[0] | |
lat = cellId[1] | |
# Rasterise the features | |
# We can use the first image file as the base | |
# Rasterising the feature id's will be fid + 1 | |
# eg an fid of 10 is a segment id of 11, this allows for 0 to be the fill | |
# value | |
segments_ds = Rasterise(RasterFilename=files[0], VectorFilename=vector_file) | |
logging.info("Rasterising features.") | |
segments_ds.rasterise() | |
# Extract the array | |
veg2rast = segments_ds.segemented_array | |
# Initialise the segment visitor | |
seg_vis = SegmentVisitor(veg2rast) | |
# Update the total area (recursive histogram technique) | |
# input keyword modifies in-place | |
recursive_h = histogram(veg2rast.ravel(), input=input_hist, Min=Min_id, | |
Max=Max_id) | |
# Get specific attribute records | |
logging.info("Opening vector file %s" %vector_file) | |
vec_ds = ogr.Open(vector_file) | |
layer = vec_ds.GetLayer() | |
# Define the headings for the data frame | |
headings = ["Filename", "Time Stamp", "Feature Name", "AUSHYDRO_ID", | |
"FID", "Total Pixel Count", "WATER_NOT_PRESENT", | |
"NO_DATA", "MASKED_NO_CONTIGUITY", | |
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW", | |
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW", | |
"MASKED_CLOUD", "WATER_PRESENT"] | |
# Initialise dicts to hold feature names, hydro_id and data frame | |
feature_names = {} | |
hydro_id = {} | |
fid_df = {} | |
# Dicts to hold forward and backward mapping of fid's and segment id's | |
# seg = segment | |
# fid = feature id | |
seg2fid = {} | |
fid2seg = {} | |
logging.info("Gathering attribute information for each feature.") | |
# These Field Id's are unique to NGIG's vector datasets | |
for feature in layer: | |
fid = feature.GetFID() | |
feature_names[fid] = feature.GetField("NAME") | |
hydro_id[fid] = feature.GetField("AUSHYDRO_I") | |
seg2fid[fid+1] = fid | |
fid2seg[fid] = fid + 1 | |
fid_df[fid] = pandas.DataFrame(columns=headings) | |
# Go back to the start of the vector file | |
layer.ResetReading() | |
# Replace any occurences of None with UNKNOWN | |
for key in feature_names: | |
if feature_names[key] == None: | |
feature_names[key] = 'UNKNOWN' | |
# Loop over each WaterExtent file | |
for waterExtent in sortedWaterExtents: | |
logging.info("Processing %s" % waterExtent.filename) | |
# Read the waterLayer from the extent file | |
waterLayer = waterExtent.getArray() | |
# timestamp | |
timestamp = waterExtent.getDatetime() | |
# Loop over each feature Id | |
# Skip any FID's that don't exist in the current spatial extent | |
for key in fid2seg: | |
if fid2seg[key] > seg_vis.max_segID: | |
continue | |
data = seg_vis.getSegmentData(waterLayer, segmentID=fid2seg[key]) | |
# dimensions of the data which will be 1D | |
dim = data.shape | |
# Returns are 1D arrays, so check if we have an empty array | |
if dim[0] == 0: | |
continue # Empty bin, (no data), skipping | |
FID = key | |
# Characterise the polygon's makeup based on the WoFS data. | |
h = histogram(data, Min=0, Max=128) | |
hist = h['histogram'] | |
# Area is the number of pixels | |
total_area = dim[0] | |
""" | |
A WaterTile stores 1 data layer encoded as unsigned BYTE values as described in the WaterConstants.py file. | |
Note - legal (decimal) values are: | |
0: no water in pixel | |
1: no data (one or more bands) in source NBAR image | |
2-127: pixel masked for some reason (refer to MASKED bits) | |
128: water in pixel | |
Values 129-255 are illegal (i.e. if bit 7 set, all others must be unset) | |
WATER_PRESENT (dec 128) bit 7: 1=water present, 0=no water if all other bits zero | |
MASKED_CLOUD (dec 64) bit 6: 1=pixel masked out due to cloud, 0=unmasked | |
MASKED_CLOUD_SHADOW (dec 32) bit 5: 1=pixel masked out due to cloud shadow, 0=unmasked | |
MASKED_HIGH_SLOPE (dec 16) bit 4: 1=pixel masked out due to high slope, 0=unmasked | |
MASKED_TERRAIN_SHADOW (dec 8) bit 3: 1=pixel masked out due to terrain shadow, 0=unmasked | |
MASKED_SEA_WATER (dec 4) bit 2: 1=pixel masked out due to being over sea, 0=unmasked | |
MASKED_NO_CONTIGUITY (dec 2) bit 1: 1=pixel masked out due to lack of data contiguity, 0=unmasked | |
NO_DATA (dec 1) bit 0: 1=pixel masked out due to NO_DATA in NBAR source, 0=valid data in NBAR | |
WATER_NOT_PRESENT (dec 0) All bits zero indicated valid observation, no water present | |
""" | |
# [0..128] bins were generated, i.e 129 bins | |
WATER_NOT_PRESENT = hist[0] | |
NO_DATA = hist[1] | |
MASKED_NO_CONTIGUITY = hist[2] | |
MASKED_SEA_WATER = hist[4] | |
MASKED_TERRAIN_SHADOW = hist[8] | |
MASKED_HIGH_SLOPE = hist[16] | |
MASKED_CLOUD_SHADOW = hist[32] | |
MASKED_CLOUD = hist[64] | |
WATER_PRESENT = hist[128] | |
format_dict = {'Filename': waterExtent.filename, | |
'Time Stamp': timestamp, | |
'Feature Name': feature_names[key], | |
'AUSHYDRO_ID': hydro_id[key], | |
'FID': FID, | |
'Total Pixel Count': total_area, | |
'WATER_NOT_PRESENT': WATER_NOT_PRESENT, | |
'NO_DATA': NO_DATA, | |
'MASKED_NO_CONTIGUITY': MASKED_NO_CONTIGUITY, | |
'MASKED_SEA_WATER': MASKED_SEA_WATER, | |
'MASKED_TERRAIN_SHADOW': MASKED_TERRAIN_SHADOW, | |
'MASKED_HIGH_SLOPE': MASKED_HIGH_SLOPE, | |
'MASKED_CLOUD_SHADOW': MASKED_CLOUD_SHADOW, | |
'MASKED_CLOUD': MASKED_CLOUD, | |
'WATER_PRESENT': WATER_PRESENT} | |
# Append the new data to the FID data frame | |
fid_df[FID] = fid_df[FID].append(format_dict, ignore_index=True) | |
return fid_df | |
if __name__ == '__main__': | |
description = 'Polygon characterisation of WoFS data through time.' | |
parser = argparse.ArgumentParser(description) | |
parser.add_argument('--outdir', dest="baseOutputPath", help="Output base directory.", required=True) | |
parser.add_argument('--log', dest="logPath", help="Directory where log files will be written.", required=True) | |
parser.add_argument('--indir', required=True, help="Input directory containing water extent files.") | |
parser.add_argument('--sfx', default='*.tif', help="File suffix to search for. Default is '*.tif'") | |
parser.add_argument('--vector', required=True, help="An OGR compatible vector file.") | |
parser.add_argument('--outname', default='WaterExtentVectorSummary.csv', help="The name of the output file to contain the summary. Default is 'WaterExtentVectorSummary.csv'.") | |
# Collect the arguments | |
args = parser.parse_args() | |
# Retrieve command arguments | |
baseOutputPath = args.baseOutputPath | |
log = args.logPath | |
path = args.indir | |
pattern = args.sfx | |
vector_file = args.vector | |
outfname = args.outname | |
# For testing we're defining the cells to process. | |
# In future we'll get these returned from the API | |
# Run | |
#cell_list = ['144_-041', '145_-041', '147_-041', '148_-041', '145_-042', | |
# '146_-042', '147_-042', '148_-042', '145_-043', '146_-043', | |
# '147_-043', '146_-044'] | |
#cell_list = ['144_-041'] | |
cell_list = ['146_-037'] | |
tiled_main(vector_file, cell_list, path, baseOutputPath, pattern, log) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
from os.path import join as pjoin, dirname, basename | |
import cPickle as pickle | |
import argparse | |
import numpy | |
import luigi | |
import pandas | |
from datetime import datetime as dt | |
from water_body_characterisation import get_files | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
class WriteGroupTask(luigi.Task): | |
group_id = luigi.IntParameter() | |
def requires(self): | |
return [] | |
def output(self): | |
out_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
'WriteGroupTask_{}.completed'.format(self.group_id)) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
# Config params | |
id_name = CONFIG.get('work', 'feature_name') | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
combined_cell_wc_fname = CONFIG.get('outputs', | |
'combined_cell_wc_filename') | |
ngroups = int(CONFIG.get('internals', 'pandas_groups') | |
chunksize = int(CONFIG.get('internals', 'pandas_chunksize') | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
# Define base headings | |
headings = [id_name, "Timestamp", "Total_Pixel_Count", | |
"WATER_NOT_PRESENT", | |
"NO_DATA", "MASKED_NO_CONTIGUITY", | |
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW", | |
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW", | |
"MASKED_CLOUD", "WATER_PRESENT"] | |
# Define the output store | |
out_format = CONFIG.get('outputs', 'groups_filename_format') | |
store_fname = pjoin(base_out_dir, out_format.format(self.group_id)) | |
group_store = pandas.HDFStore(store_fname) | |
for cell in cells: | |
cell_dir = pjoin(base_out_dir, cell) | |
cell_wc_fname = pjoin(cell_dir, combined_cell_wc_fname) | |
# Open the current cells WC result file | |
store = pandas.HDFStore(cell_wc_fname, 'r') | |
if '/data' in store.keys(): | |
groups = store['data'].groupby(store['data'][id_name] | |
% ngroups) | |
else: | |
store.close() | |
continue | |
for grp, grouped in groups: | |
if grp != self.group_id: | |
continue | |
group_store.append('data', grouped, data_columns=headings, | |
index=False) | |
store.close() | |
group_store.close() | |
with self.output().open('w') as outf: | |
outf.write('completed') | |
class ManageAllGroupWriteTasks(luigi.Task): | |
ngroups = luigi.IntParameter() | |
def requires(self): | |
tasks = [] | |
for i in range(self.ngroups): | |
tasks.append(WriteGroupTask(i)) | |
return tasks | |
def output(self): | |
out_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
'ManageAllGroupWriteTasks.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('completed') | |
class CombineAllGroups(luigi.Task): | |
ngroups = luigi.IntParameter() | |
def requires(self): | |
return [ManageAllGroupWriteTasks(self.ngroups)] | |
def output(self): | |
out_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
'CombineAllGroups.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
id_name = CONFIG.get('work', 'feature_name') | |
out_dir = CONFIG.get('work', 'output_directory') | |
out_format = CONFIG.get('outputs', 'groups_filename_format') | |
files = get_files(out_dir, out_format.format('*')) | |
base_out_fname = CONFIG.get('outputs', 'final_output_filename') | |
out_fname = pjoin(out_dir, base_out_fname) | |
combined_store = pandas.HDFStore(out_fname, 'w') | |
# Define base headings | |
headings = [id_name, "Timestamp", "Total_Pixel_Count", | |
"WATER_NOT_PRESENT", | |
"NO_DATA", "MASKED_NO_CONTIGUITY", | |
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW", | |
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW", | |
"MASKED_CLOUD", "WATER_PRESENT"] | |
items = ['Timestamp', id_name] | |
for f in files: | |
store = pandas.HDFStore(f, 'r') | |
if not '/data' in store.keys(): | |
store.close() | |
continue | |
df = store['data'].groupby(items, as_index=False).sum() | |
combined_store.append('data', df, data_columns=headings) | |
store.close() | |
combined_store.close() | |
with self.output().open('w') as outf: | |
outf.write('completed') | |
if __name__ == '__main__': | |
ngroups = int(CONFIG.get('work', 'groups')) | |
luigi.build([CombineAllGroups(ngroups)], local_scheduler=True, workers=16) | |
luigi.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=05:00:00,mem=32GB,ncpus=16 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load gdal/1.10.1 | |
module load IDL_functions/0.2 | |
module load gaip/test | |
module load agdc-api | |
module load pandas | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water | |
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/parallel_combine.py | |
python $PY_FILE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q normal | |
#PBS -l walltime=10:00:00,mem=1600GB,ncpus=800 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load gdal/1.10.1 | |
module load gaip/test | |
module load agdc-api | |
module load pandas | |
module unload IDL_functions | |
module load IDL_functions/0.2 | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water | |
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/workflow.py | |
mpirun -n 16 python $PY_FILE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=05:00:00,mem=16GB,ncpus=1 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load gdal/1.10.1 | |
module load IDL_functions/0.2 | |
module load gaip/test | |
module load agdc-api | |
module load pandas | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water | |
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/combine.py | |
python $PY_FILE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=00:10:00,mem=3GB,ncpus=1 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load gdal/1.10.1 | |
module load IDL_functions/0.2 | |
module load gaip/test | |
module load agdc-api | |
module load pandas | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/common | |
export PYTHONPATH=$PYTHONPATH:/home/547/jps547/git_repositories/ga-neo-nfrip/system/water | |
PY_FILE=/home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/submit_query.py | |
python $PY_FILE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
import logging | |
import os | |
from os.path import join as pjoin, exists, dirname | |
import cPickle as pickle | |
import subprocess | |
import ogr | |
from water_body_characterisation import spatial_intersection | |
from water_body_characterisation import get_files | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
PBS_DSH = ( | |
"""#!/bin/bash | |
#PBS -P {project} | |
#PBS -q {queue} | |
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB | |
#PBS -l wd | |
#PBS -me | |
#PBS -M {email} | |
NNODES={nnodes} | |
for i in $(seq 1 $NNODES); do | |
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \ | |
--tile $[$i - 1]" & | |
done; | |
wait | |
""") | |
# DEPRECATED | |
class CellQueryTask(luigi.Task): | |
""" | |
A task to generate a list of which cells will be analysed | |
in the throughout the workflow. | |
DEPRECATED in favor of cell_query() | |
""" | |
def requires(self): | |
return [] | |
def output(self): | |
out_dir = CONFIG.get('work', 'output_directory') | |
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list')) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
out_dir = CONFIG.get('work', 'output_directory') | |
envelope = bool(CONFIG.get('internals', 'envelope')) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
dcube_vector_fname = CONFIG.get('internals', 'cell_grid') | |
fids = spatial_intersection(dcube_vector_fname, vector_fname, | |
envelope=envelope) | |
vec_ds = ogr.Open(dcube_vector_fname) | |
layer = vec_ds.GetLayer(0) | |
cell_dirs = [] | |
for fid in fids: | |
feat = layer.GetFeature(fid) | |
cell_dir = "{:03.0f}_{:04.0f}".format(feat.GetField("XMIN"), | |
feat.GetField("YMIN")) | |
cell_dirs.append(cell_dir) | |
out_cell_dir = pjoin(out_dir, cell_dir) | |
if not exists(out_cell_dir): | |
os.makedirs(out_cell_dir) | |
with self.output().open('w') as outf: | |
pickle.dump(cell_dirs, outf) | |
# DEPRECATED | |
class FindWaterFilesTask(luigi.Task): | |
""" | |
Find the WaterExtents files and generate a task list for a | |
cell. | |
DEPRECATED in favour of find_water_files(). | |
""" | |
data_dir = luigi.Parameter() | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'water_files_list')) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
pattern = CONFIG.get('work', 'pattern') | |
files = get_files(self.data_dir, pattern) | |
with self.output().open('w') as out_file: | |
pickle.dump(files, out_file) | |
# DEPRECATED | |
class AllCellsFindFilesTask(luigi.Task): | |
""" | |
A helper task that kicks off file listings for each cell. | |
DEPRECATED in favour of find_water_files(). | |
""" | |
water_dir = CONFIG.get('internals', 'water_directory') | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
def requires(self): | |
cells_list_fname = pjoin(self.base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells: | |
data_dir = pjoin(self.water_dir, cell) | |
out_dir = pjoin(self.base_out_dir, cell) | |
tasks.append(FindWaterFilesTask(data_dir, out_dir)) | |
return tasks | |
def output(self): | |
out_fname = pjoin(self.base_out_dir, 'AllCellsFindFilesTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
def cell_query(): | |
""" | |
A simple procedure to generate a list which cells that will | |
be analysed throughout the workflow. | |
""" | |
out_dir = CONFIG.get('work', 'output_directory') | |
envelope = bool(CONFIG.get('internals', 'envelope')) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
dcube_vector_fname = CONFIG.get('internals', 'cell_grid') | |
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list')) | |
fids = spatial_intersection(dcube_vector_fname, vector_fname, | |
envelope=envelope) | |
vec_ds = ogr.Open(dcube_vector_fname) | |
layer = vec_ds.GetLayer(0) | |
cell_dirs = [] | |
for fid in fids: | |
feat = layer.GetFeature(fid) | |
cell_dir = "{:03.0f}_{:04.0f}".format(feat.GetField("XMIN"), | |
feat.GetField("YMIN")) | |
cell_dirs.append(cell_dir) | |
out_cell_dir = pjoin(out_dir, cell_dir) | |
if not exists(out_cell_dir): | |
os.makedirs(out_cell_dir) | |
with open(out_fname, 'w') as outf: | |
pickle.dump(cell_dirs, outf) | |
def find_water_files(): | |
""" | |
A simple procedure that finds water files from each cell that | |
needs to be analysed. | |
""" | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
water_dir = CONFIG.get('internals', 'water_directory') | |
pattern = CONFIG.get('internals', 'pattern') | |
base_out_fname = CONFIG.get('outputs', 'water_files_list') | |
cells_list_fname = pjoin(base_out_dir, CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
for cell in cells: | |
data_dir = pjoin(water_dir, cell) | |
out_dir = pjoin(base_out_dir, cell) | |
out_fname = pjoin(out_dir, base_out_fname) | |
files = get_files(data_dir, pattern) | |
with open(out_fname, 'w') as out_file: | |
pickle.dump(files, out_file) | |
def create_tiles(array_size, tile_size=25): | |
""" | |
A minor function to tile a 1D array or list into smaller manageable | |
portions. | |
""" | |
idx_start = range(0, array_size, tile_size) | |
idx_tiles = [] | |
for idx_st in idx_start: | |
if idx_st + tile_size < array_size: | |
idx_end = idx_st + tile_size | |
else: | |
idx_end = array_size | |
idx_tiles.append((idx_st, idx_end)) | |
return idx_tiles | |
if __name__ == '__main__': | |
# Create the output directory | |
out_dir = CONFIG.get('work', 'output_directory') | |
if not exists(out_dir): | |
os.makedirs(out_dir) | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/run_wc_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Just turn these into a simple function calls. In this framework we don't | |
# need a luigi workload for a simple single task | |
cell_query() | |
find_water_files() | |
cells_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
# Create the tiles list that contains groups of cells to operate on | |
# Each node will have a certain number of cells to work on | |
cpnode = int(CONFIG.get('internals', 'cells_per_node')) | |
tiles = create_tiles(len(cells), cpnode) | |
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'w') as out_file: | |
pickle.dump(tiles, out_file) | |
# Setup the modules to use for the job | |
modules = CONFIG.get('pbs', 'modules').split(',') | |
modules = ['module load {}; '.format(module) for module in modules] | |
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;') | |
modules = ''.join(modules) | |
# Calculate the job node and cpu requirements | |
nnodes = len(tiles) | |
ncpus = nnodes * 16 | |
mem = nnodes * 32 | |
# Populate the PBS shell script | |
project = CONFIG.get('pbs', 'project') | |
queue = CONFIG.get('pbs', 'queue') | |
walltime = CONFIG.get('pbs', 'walltime') | |
email = CONFIG.get('pbs', 'email') | |
py_file = pjoin(dirname(__file__), 'workflow.py') | |
pbs_job = PBS_DSH.format(project=project, queue=queue, | |
walltime=walltime, ncpus=ncpus, mem=mem, | |
email=email, nnodes=nnodes, | |
modules=modules, pyfile=py_file) | |
# Out put the shell script to disk | |
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename')) | |
with open(pbs_fname, 'w') as out_file: | |
out_file.write(pbs_job) | |
subprocess.check_call(['qsub', pbs_fname]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import glob | |
import sys | |
import logging | |
# Debugging | |
import datetime | |
#import pdb | |
import numpy | |
from osgeo import gdal | |
from osgeo import ogr | |
import pandas | |
# ga-neo-nfrip repo | |
# Until a module is setup we'll define a temp PYTHONPATH | |
sys.path.append('/home/547/jps547/git_repositories/ga-neo-nfrip/system/common') | |
sys.path.append('/home/547/jps547/git_repositories/ga-neo-nfrip/system/water') | |
from WaterExtent import WaterExtent | |
from fileSystem import Directory | |
# IDL_functions repo | |
from IDL_functions import histogram | |
# image_processing repo | |
from image_processing.segmentation.rasterise import Rasterise | |
from image_processing.segmentation.segmentation import SegmentVisitor | |
from gaip import GriddedGeoBox | |
from gaip import read_img | |
from gaip import write_img | |
def spatial_intersection(input_vector_fname, base_vector_fname, envelope=True): | |
""" | |
Performs a spatial intersection of feature geometry to and | |
returns a list of FID's from the base vector file. | |
:param input_vector_fname: | |
A string containing the full file path name to an | |
OGR compliant vector file. | |
:param base_vector_fname: | |
A string containing the full file path name to an | |
OGR compliant vector file. This file will be used to select | |
features from. | |
:param envelope: | |
If set to True (Default), then the envelope of each feature | |
will be used rather than the geometry of the feature to perform | |
intersection. | |
:return: | |
A `list` containing the FID's of the base vector. | |
""" | |
vec_ds1 = ogr.Open(input_vector_fname) | |
vec_ds2 = ogr.Open(base_vector_fname) | |
lyr1 = vec_ds1.GetLayer() | |
lyr2 = vec_ds2.GetLayer() | |
fids = [] | |
if envelope: | |
for feat2 in lyr2: | |
geom = feat2.GetGeometryRef() | |
xmin, xmax, ymin, ymax = geom.GetEnvelope() | |
lyr1.SetSpatialFilterRect(xmin, ymin, xmax, ymax) | |
for feat1 in lyr1: | |
fids.append(feat1.GetFID()) | |
lyr1.SetSpatialFilter(None) | |
else: | |
for feat2 in lyr2: | |
ref = feat2.GetGeometryRef() | |
lyr1.SetSpatialFilter(ref) | |
for feat1 in lyr1: | |
fids.append(feat1.GetFID()) | |
lyr1.SetSpatialFilter(None) | |
fids = numpy.unique(numpy.array(fids)).tolist() | |
return fids | |
def rasterise_vector(vector_filename, image_filename, out_filename): | |
""" | |
Given a vector and an image, rasterise the vector using the | |
images dimensions and spatial extents as a base. | |
:param vector_filename: | |
A `string` containing the full file path name to the | |
vector that is to be rasterised. | |
:param image_filename: | |
A `string` containing the full file path name to the | |
image that is to be used as a base when rasterising the | |
vector file. | |
:param out_filename: | |
A `string` containing the full file path name to be | |
used for creating the output image. | |
:return: | |
None. The output image is written directly to disk. | |
""" | |
segments_ds = Rasterise(image_filename, vector_filename) | |
segments_ds.rasterise() | |
geobox = GriddedGeoBox.from_gdal_dataset(gdal.Open(image_filename)) | |
write_img(segments_ds.segmented_array, out_filename, fmt="GTiff", | |
geobox=geobox, nodata=0) | |
def get_files(path, pattern): | |
""" | |
Just an internal function to find files given an file extension. | |
This isn't really designed to go beyond development demonstration | |
for this analytical workflow. | |
""" | |
# Get the current directory so we can change back later | |
cwd = os.getcwd() | |
os.chdir(path) | |
# Find the files matching the pattern | |
files = glob.glob(pattern) | |
files = [os.path.abspath(i) for i in files] | |
# Change back to the original directory | |
os.chdir(cwd) | |
return files | |
def get_water_extents(file_list, sort=True): | |
""" | |
Given a list of water extent image files, create a list of | |
waterExtent class objects, and if sort by time, old -> new, if | |
sort=True. | |
:param file_list: | |
A list containing filepath names to water extent image files. | |
:param sort: | |
A boolean keyword indicating if the waterExtent objects | |
should be sorted before they're returned. Default is True. | |
:return: | |
A list of waterExtent class objects, one for every water | |
image file in file_list, and optionally sorted by date, | |
old -> new. | |
""" | |
water_extents = [] | |
cell_id = None | |
for f in file_list: | |
water_extent = WaterExtent(f) | |
# check for lon, lat consistency | |
if cell_id: | |
this_cell_id = [water_extent.lon, water_extent.lat] | |
if this_cell_id != cell_id: | |
msg = ("Extents must be from same cell. At file {} got {}, " | |
"expecting {}") | |
msg = msg.format(f, this_cell_id, cell_id) | |
logging.error(msg) | |
sys.exit(1) | |
else: | |
cell_id = [water_extent.lon, water_extent.lat] | |
water_extents.append(water_extent) | |
if sort: | |
# all good, so now sort the extents by datetime | |
logging.info("Collected %d files. Now sort them." % len(file_list)) | |
sorted_water_extents = sorted(water_extents, | |
key=lambda extent: extent.getDatetime()) | |
return (sorted_water_extents, cell_id) | |
else: | |
msg = "Collected {} files. Sorting not applied.".format(len(file_list)) | |
logging.info(msg) | |
return (water_extents, cell_id) | |
def water_characterisation(water_filename, rasterised_filename, id_name='SID', | |
id_map=None): | |
""" | |
Calculates the water characterisation. | |
""" | |
# Initialiase a blank dataframe | |
headings = [id_name, "Timestamp", "Total_Pixel_Count", "WATER_NOT_PRESENT", | |
"NO_DATA", "MASKED_NO_CONTIGUITY", | |
"MASKED_SEA_WATER", "MASKED_TERRAIN_SHADOW", | |
"MASKED_HIGH_SLOPE", "MASKED_CLOUD_SHADOW", | |
"MASKED_CLOUD", "WATER_PRESENT"] | |
df = pandas.DataFrame(columns=headings) | |
water_extent = WaterExtent(water_filename) | |
# timestamp | |
timestamp = water_extent.getDatetime() | |
# Read the rasterised image | |
img = read_img(rasterised_filename) | |
# Initialise the segment visitor | |
seg_vis = SegmentVisitor(img) | |
# Initialise dicts to hold, hydro_id and data frame | |
#seg_df = {} | |
# Get the potential segement ID's | |
if seg_vis.n_segments == 0: | |
return df | |
seg_ids = range(seg_vis.min_seg_id, seg_vis.max_seg_id + 1) | |
# Read the water layer data | |
water_data = water_extent.getArray() | |
# Create a mapping of SID's that map to iteslf if we have None | |
if id_map is None: | |
id_map = {} | |
for i in seg_ids: | |
id_map[i] = i | |
for seg_id in seg_ids: | |
data = seg_vis.get_segment_data(water_data, segment_id=seg_id) | |
# dimensions of the data which will be 1D | |
dim = data.shape | |
# Returns are 1D arrays, so check if we have an empty array | |
if dim[0] == 0: | |
continue # Empty bin, (no data), skipping | |
# Characterise the polygon's makeup based on the WoFS data. | |
h = histogram(data, Min=0, Max=128) | |
hist = h['histogram'] | |
# Area is the number of pixels | |
total_area = dim[0] | |
""" | |
A WaterTile stores 1 data layer encoded as unsigned BYTE values as | |
described in the WaterConstants.py file. | |
Note - legal (decimal) values are: | |
0: no water in pixel | |
1: no data (one or more bands) in source NBAR image | |
2-127: pixel masked for some reason (refer to MASKED bits) | |
128: water in pixel | |
Values 129-255 are illegal (i.e. if bit 7 set, all others must be | |
unset). | |
WATER_PRESENT (dec 128) bit 7: 1 = water present, | |
0 = no water if all other bits zero | |
MASKED_CLOUD (dec 64) bit 6: 1 = pixel masked out due to | |
cloud, 0 = unmasked | |
MASKED_CLOUD_SHADOW (dec 32) bit 5: 1 = pixel masked out due to | |
cloud shadow, 0 = unmasked | |
MASKED_HIGH_SLOPE (dec 16) bit 4: 1 = pixel masked out due to | |
high slope, 0 = unmasked | |
MASKED_TERRAIN_SHADOW (dec 8) bit 3: 1 = pixel masked out due to | |
terrain shadow, 0 = unmasked | |
MASKED_SEA_WATER (dec 4) bit 2: 1 = pixel masked out due to | |
being over sea, 0 = unmasked | |
MASKED_NO_CONTIGUITY (dec 2) bit 1: 1 = pixel masked out due to | |
lack of data contiguity, 0 = unmasked | |
NO_DATA (dec 1) bit 0: 1 = pixel masked out due to | |
NO_DATA in NBAR source, 0 = valid data in NBAR | |
WATER_NOT_PRESENT (dec 0) All bits zero indicated valid | |
observation, no water present | |
""" | |
# [0..128] bins were generated, i.e 129 bins | |
WATER_NOT_PRESENT = hist[0] | |
NO_DATA = hist[1] | |
MASKED_NO_CONTIGUITY = hist[2] | |
MASKED_SEA_WATER = hist[4] | |
MASKED_TERRAIN_SHADOW = hist[8] | |
MASKED_HIGH_SLOPE = hist[16] | |
MASKED_CLOUD_SHADOW = hist[32] | |
MASKED_CLOUD = hist[64] | |
WATER_PRESENT = hist[128] | |
format_dict = {id_name: id_map[seg_id], | |
'Timestamp': timestamp, | |
'Total_Pixel_Count': total_area, | |
'WATER_NOT_PRESENT': WATER_NOT_PRESENT, | |
'NO_DATA': NO_DATA, | |
'MASKED_NO_CONTIGUITY': MASKED_NO_CONTIGUITY, | |
'MASKED_SEA_WATER': MASKED_SEA_WATER, | |
'MASKED_TERRAIN_SHADOW': MASKED_TERRAIN_SHADOW, | |
'MASKED_HIGH_SLOPE': MASKED_HIGH_SLOPE, | |
'MASKED_CLOUD_SHADOW': MASKED_CLOUD_SHADOW, | |
'MASKED_CLOUD': MASKED_CLOUD, | |
'WATER_PRESENT': WATER_PRESENT} | |
# Append the new data to the FID data frame | |
df = df.append(format_dict, ignore_index=True) | |
return df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
from datetime import datetime as dt | |
import logging | |
import os | |
from os.path import join as pjoin, exists, dirname, basename | |
import cPickle as pickle | |
import argparse | |
import ogr | |
import pandas | |
from water_body_characterisation import spatial_intersection | |
from water_body_characterisation import get_files | |
from water_body_characterisation import rasterise_vector | |
from water_body_characterisation import get_water_extents | |
from water_body_characterisation import water_characterisation | |
from submit_query import create_tiles | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
class CellWCTasks(luigi.Task): | |
""" | |
For a given cell define a water characterisation task for each | |
water file. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
base_name = CONFIG.get('outputs', 'water_characterisation_out_format') | |
base_name = pjoin(self.out_dir, base_name) | |
water_list = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'water_files_list')) | |
with open(water_list, 'r') as infile: | |
files = pickle.load(infile) | |
sorted_water_extents, _ = get_water_extents(files) | |
targets = [] | |
for water_extent in sorted_water_extents: | |
out_fname = base_name.format(water_extent.getDatetime()) | |
out_fname = out_fname.replace(' ', '-') | |
water_fname = water_extent.path | |
targets.append(WaterCharacterisationTask(water_fname, out_fname)) | |
return targets | |
def output(self): | |
out_fname = pjoin(self.out_dir, | |
'CellWCTasks.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class AllCellsWCTask(luigi.Task): | |
""" | |
A helper task to issue water characterisation tasks to each | |
cell. | |
""" | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
def requires(self): | |
cells_list_fname = pjoin(self.base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells: | |
out_dir = pjoin(self.base_out_dir, cell) | |
tasks.append(CellWCTasks(out_dir)) | |
return tasks | |
def output(self): | |
out_fname = pjoin(self.base_out_dir, | |
'AllCellsWCTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class AllCellsRasteriseTaskTiled(luigi.Task): | |
""" | |
A helper task that kicks off RasteriseTask's for each cell. | |
""" | |
idx1 = luigi.IntParameter() | |
idx2 = luigi.IntParameter() | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
def requires(self): | |
cells_list_fname = pjoin(self.base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells[self.idx1:self.idx2]: | |
out_dir = pjoin(self.base_out_dir, cell) | |
tasks.append(RasteriseTask(out_dir)) | |
return tasks | |
def output(self): | |
out_fname = 'AllCellsRasteriseTaskTiled_{}:{}.completed' | |
out_fname = out_fname.format(self.idx1, self.idx2) | |
out_fname = pjoin(self.base_out_dir, out_fname) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class RasteriseTask(luigi.Task): | |
""" | |
Computes the rasterisation for a cell. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'rasterise_filename')) | |
water_files_list = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'water_files_list')) | |
with open(water_files_list, 'r') as infile: | |
water_files = pickle.load(infile) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
rasterise_vector(vector_fname, water_files[0], out_fname) | |
with self.output().open('w') as outf: | |
outf.write('Complete') | |
msg = "Rasterise completed for {} time {}".format(out_fname, dt.now()) | |
logging.debug(msg) | |
class WaterCharacterisationTask(luigi.Task): | |
""" | |
Computes the water characterisation task. | |
""" | |
water_fname = luigi.Parameter() | |
out_fname = luigi.Parameter() | |
def requires(self): | |
return [RasteriseTask(dirname(self.out_fname))] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
rasterised_fname = pjoin(dirname(self.out_fname), | |
CONFIG.get('outputs', 'rasterise_filename')) | |
id_name = CONFIG.get('work', 'feature_name') | |
# Open the vector file | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
vec_ds = ogr.Open(vector_fname) | |
layer = vec_ds.GetLayer() | |
# Dicts to hold forward and backward mapping of fid's and segment id's | |
hydro_id = {} | |
# Get the attribute of interest to save as the group name | |
# ogr will only display a certain number of characters | |
# customise the attribute name for the time being, we can put in the | |
# config file later | |
attribute = "AUSHYDRO_I" | |
for feature in layer: | |
fid = feature.GetFID() | |
hydro_id[fid + 1] = feature.GetField(attribute) | |
layer.ResetReading() | |
layer = None | |
vec_ds = None | |
result = water_characterisation(self.water_fname, rasterised_fname, | |
id_name=id_name, id_map=hydro_id) | |
# Open the output hdf5 file | |
store = pandas.HDFStore(self.output().path) | |
# Write the dataframe | |
store.append('data', result) | |
# Save and close the file | |
store.close() | |
class CombineCellWCTasksTiled(luigi.Task): | |
""" | |
A helper task that creates CombineWCTasks for each cell. | |
""" | |
idx1 = luigi.IntParameter() | |
idx2 = luigi.IntParameter() | |
def requires(self): | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells[self.idx1:self.idx2]: | |
out_dir = pjoin(base_out_dir, cell) | |
tasks.append(CombineWCTasks(out_dir)) | |
return tasks | |
def output(self): | |
out_dir = CONFIG.get('work', 'output_directory') | |
out_fname = pjoin(out_dir, 'CombineCellWCTasksTiled_{}:{}.completed') | |
out_fname = out_fname.format(self.idx1, self.idx2) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class CombineWCTasks(luigi.Task): | |
""" | |
Combines all water characterisation from a single cell into a | |
single file. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [CellWCTasks(self.out_dir)] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'CombineWCTasks.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
# Get an os listing of the WC files for the current out_dir | |
# or define the names | |
base_name = CONFIG.get('outputs', 'water_characterisation_out_format') | |
base_name = pjoin(self.out_dir, base_name) | |
# Create an output file that we can continually append data | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'combined_cell_wc_filename')) | |
combined_store = pandas.HDFStore(out_fname) | |
# We'll use the water extent files to derive the filenames of the | |
# hdf5 files we need to open | |
water_list = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'water_files_list')) | |
with open(water_list, 'r') as infile: | |
files = pickle.load(infile) | |
# Sort the files | |
sorted_water_extents, _ = get_water_extents(files) | |
# Open the first file; if we have '/data' then we have polygons to | |
# extract data for | |
file_name = base_name.format(sorted_water_extents[0].getDatetime()) | |
file_name = file_name.replace(' ', '-') | |
store = pandas.HDFStore(file_name) | |
# If there is nothing in the first file there will be nothing for | |
# every file | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
headings = store['data'].columns.tolist() | |
store.close() | |
df = pandas.DataFrame(columns=headings) | |
for water_extent in sorted_water_extents: | |
# Derive the filename of the hdf5 file to open | |
wc_fname = base_name.format(water_extent.getDatetime()) | |
wc_fname = wc_fname.replace(' ', '-') | |
# Open, append and close | |
store = pandas.HDFStore(wc_fname) | |
df = df.append(store['data']) | |
store.close() | |
# Re-define the table index | |
df.reset_index(inplace=True) | |
# Write to disk | |
combined_store.append('data', df) | |
# Save and close the file | |
combined_store.close() | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
# Setup command-line arguments | |
desc = "Processes water characterisation for a given set of cells." | |
hlp = ("The tile/chunk index to retieve from the tiles list. " | |
"(Needs to have been previously computed to a file named tiles.pkl") | |
parser = argparse.ArgumentParser(description=desc) | |
parser.add_argument('--tile', type=int, help=hlp) | |
parsed_args = parser.parse_args() | |
tile_idx = parsed_args.tile | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/run_wc_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Get the list of tiles (groups of cells that each node will operate on | |
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'r') as in_file: | |
tiles = pickle.load(in_file) | |
# Initialise the job | |
tile = tiles[tile_idx] | |
tasks = [CombineCellWCTasksTiled(tile[0], tile[1])] | |
# Just test with computing the rasterisation first | |
#tasks = [AllCellsRasteriseTaskTiled(tile[0], tile[1])] | |
luigi.build(tasks, local_scheduler=True, workers=16) | |
luigi.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment