Last active
December 18, 2015 05:00
-
-
Save sixy6e/9ea690493970eb12a237 to your computer and use it in GitHub Desktop.
pilbara
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[work] | |
output_directory = /g/data/v10/testing_ground/jps547/sue-fyfe/results | |
logs_directory = %(output_directory)s/logs | |
satellites = LS5,LS7,LS8 | |
min_date = 1985_01_01 | |
max_date = 2015_12_31 | |
vector_filename = /g/data/v10/testing_ground/jps547/sue-fyfe/LH_Nominated_EC_Saltmarsh.shp | |
[internals] | |
envelope = 0 | |
cells_per_node = 1 | |
pandas_groups = 10 | |
pandas_chunksize = 1000000 | |
water_directory = /g/data/u46/wofs/water_f7s/extents | |
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp | |
[outputs] | |
cells_list = cells_to_process.pkl | |
tiles_list = tiles.pkl | |
pbs_filename = class_dist_pbsdsh.bash | |
query_filename = datasets_list.pkl | |
stats_filename_format = class_dist_result_{}.h5 | |
combined_cell_stats_filename = combined_cell_stats.h5 | |
rasterise_filename = rasterised_result.tif | |
final_output_filename = saltmarsh_class_distribution.h5 | |
groups_filename_format = tmp_group_{}.h5 | |
#[core] | |
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg | |
[pbs] | |
project = v10 | |
queue = normal | |
walltime = 05:00:00 | |
email = [email protected] | |
modules = geo-processing,IDL_functions/0.3,pyephem/3.7.5.1,numexpr,eo-tools/0.4,gaip/test,agdc-api/0.1.0-b20150807 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy | |
from datacube.api.utils import get_dataset_data_with_pq, TCI_COEFFICIENTS | |
from datacube.api.utils import calculate_tassel_cap_index, TasselCapIndex | |
from datacube.api.utils import calculate_ndvi | |
from datacube.api.utils import get_dataset_metadata | |
#def classifier(arg25_dataset, pq25_dataset): | |
# """ | |
# Runs the classifier designed by S. Fyfe. | |
# """ | |
# # Get the metadata | |
# md = get_dataset_metadata(arg25_dataset) | |
# cols, rows = md.shape | |
# | |
# # Read the data and mask pixels via the PQ dataset | |
# data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset) | |
# | |
# # Get the wetness coefficients and calculate | |
# coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS] | |
# wetness = calculate_tassel_cap_index(data, coef) | |
# | |
# # NDVI | |
# ndvi = calculate_ndvi(data[arg25_dataset.bands.RED], | |
# data[arg25_dataset.bands.NEAR_INFRARED], | |
# output_ndv=numpy.nan) | |
# | |
# # Dump the reflectance data, the classifier only needs tc_wetness and ndvi | |
# del data | |
# | |
# # Allocate the result | |
# classified = numpy.zeros((rows,cols), dtype='uint8') | |
# | |
# # Water | |
# r1 = wetness > 0 | |
# classified[r1] = 1 | |
# _tmp = ~r1 | |
# | |
# r2 = _tmp & ((wetness >= -250) & (wetness < 0)) | |
# | |
# # non-veg | |
# r3 = r2 & (ndvi <= 0.3) | |
# classified[r3] = 2 | |
# #_tmp2 = ~r3 | |
# _tmp2 = _tmp & r2 & ~r3 | |
# | |
# #r4 = _tmp & _tmp2 & (ndvi <= 0.45) | |
# r4 = _tmp2 & (ndvi <= 0.45) | |
# #_tmp3 = ~r4 | |
# _tmp3 = _tmp2 & ~r4 | |
# | |
# # saltmarsh | |
# classified[r4] = 3 | |
# | |
# #r5 = _tmp & _tmp2 &_tmp3 & (ndvi <= 0.6) | |
# r5 = _tmp3 & (ndvi <= 0.6) | |
# | |
# # mangrove/saltmarsh | |
# classified[r5] = 4 | |
# | |
# # mangrove | |
# #_tmp5 = _tmp & _tmp2 &_tmp3 & ~r5 | |
# _tmp5 = _tmp3 & ~r5 | |
# classified[_tmp5] = 5 | |
# | |
# _tmp5 = _tmp & ~r2 | |
# | |
# r6 = _tmp5 & (wetness <= -750) | |
# | |
# r7 = r6 & (ndvi >= 0.3) | |
# | |
# # saltmarsh | |
# classified[r7] = 3 | |
# | |
# # non-veg | |
# _tmp4 = r6 & ~r7 | |
# classified[_tmp4] = 2 | |
# | |
# _tmp3 = _tmp5 & ~r6 | |
# r8 = _tmp3 & (ndvi <= 0.3) | |
# | |
# # non-veg | |
# classified[r8] = 2 | |
# | |
# r9 = _tmp3 & ~r8 & (ndvi <= 0.45) | |
# | |
# # saltmarsh | |
# classified[r9] = 3 | |
# _tmp4 = _tmp3 & ~r9 | |
# | |
# r10 = _tmp4 & (ndvi <= 0.6) | |
# | |
# # mangrove/saltmarsh | |
# classified[r10] = 4 | |
# | |
# # mangrove | |
# classified[_tmp4 & ~r10] = 5 | |
# | |
# # set any nulls | |
# valid = numpy.isfinite(ndvi) | |
# classified[~valid] = 0 | |
# | |
# return classified | |
def classifier(arg25_dataset, pq25_dataset): | |
""" | |
Runs the classifier designed by S. Fyfe. | |
""" | |
# Get the metadata | |
md = get_dataset_metadata(arg25_dataset) | |
cols, rows = md.shape | |
# Read the data and mask pixels via the PQ dataset | |
data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset) | |
# Get the wetness coefficients and calculate | |
coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS] | |
wetness = calculate_tassel_cap_index(data, coef) | |
# NDVI | |
ndvi = calculate_ndvi(data[arg25_dataset.bands.RED], | |
data[arg25_dataset.bands.NEAR_INFRARED], | |
output_ndv=numpy.nan) | |
# Dump the reflectance data, the classifier only needs tc_wetness and ndvi | |
del data | |
# Allocate the result | |
classified = numpy.zeros((rows,cols), dtype='uint8') | |
# Water | |
r1 = wetness > 0 | |
classified[r1] = 1 | |
_tmp = ~r1 | |
#r2 = _tmp & ((wetness >= -250) & (wetness < 0)) | |
r2 = (wetness >= -250) & (wetness < 0) | |
r3 = ndvi <= 0.3 | |
#_tmp2 = _tmp & r2 & ~r3 | |
_tmp2 = _tmp & r2 | |
# non-veg | |
classified[_tmp2 & r3] = 2 | |
_tmp3 = _tmp2 & ~r3 | |
r4 = ndvi <= 0.45 | |
# saltmarsh | |
classified[_tmp3 & r4] = 3 | |
_tmp2 = _tmp3 & ~r4 | |
r5 = ndvi <= 0.6 | |
# mangrove/saltmarsh | |
classified[_tmp2 & r5] = 4 | |
# mangrove | |
classified[_tmp2 & ~r5] = 5 | |
# finished rhs of r2 | |
_tmp2 = _tmp & ~r2 | |
r6 = wetness < -750 | |
r7 = ndvi >= 0.3 | |
_tmp3 = _tmp2 & r6 | |
# saltmarsh | |
classified[_tmp3 & r7] = 3 | |
# non-veg | |
classified[_tmp3 & ~r7] = 2 | |
r8 = ndvi <= 0.3 | |
_tmp3 = _tmp2 & ~r6 | |
# non-veg | |
classified[_tmp3 & r8] = 2 | |
r9 = ndvi <= 0.45 | |
_tmp2 = _tmp3 & ~r8 | |
# saltmarsh | |
classified[_tmp2 & r9] = 3 | |
r10 = ndvi <= 0.6 | |
_tmp3 = _tmp2 & ~r9 | |
# mangrove-saltmarsh | |
classified[_tmp3 & r10] = 4 | |
# mangrove | |
classified[_tmp3 & ~r10] = 5 | |
# set any nulls | |
valid = numpy.isfinite(ndvi) | |
classified[~valid] = 0 | |
return classified | |
def classifier3(arg25_dataset, pq25_dataset): | |
""" | |
Runs the classifier designed by S. Fyfe. | |
""" | |
# Get the metadata | |
md = get_dataset_metadata(arg25_dataset) | |
cols, rows = md.shape | |
# Read the data and mask pixels via the PQ dataset | |
data = get_dataset_data_with_pq(arg25_dataset, pq25_dataset) | |
# Get the wetness coefficients and calculate | |
coef = TCI_COEFFICIENTS[arg25_dataset.satellite][TasselCapIndex.WETNESS] | |
wetness = calculate_tassel_cap_index(data, coef) | |
# NDVI | |
ndvi = calculate_ndvi(data[arg25_dataset.bands.RED], | |
data[arg25_dataset.bands.NEAR_INFRARED], | |
output_ndv=numpy.nan) | |
# Dump the reflectance data, the classifier only needs tc_wetness and ndvi | |
del data | |
# Allocate the result | |
classified = numpy.zeros((rows,cols), dtype='uint8') | |
# Water | |
r1 = wetness > 0 | |
classified[r1] = 2 | |
_tmp = ~r1 | |
#r2 = _tmp & ((wetness >= -250) & (wetness < 0)) | |
r2 = (wetness >= -250) & (wetness < 0) | |
r3 = ndvi <= 0.3 | |
#_tmp2 = _tmp & r2 & ~r3 | |
_tmp2 = _tmp & r2 | |
# non-veg | |
classified[_tmp2 & r3] = 5 | |
_tmp3 = _tmp2 & ~r3 | |
r4 = ndvi <= 0.45 | |
# saltmarsh | |
classified[_tmp3 & r4] = 6 | |
_tmp2 = _tmp3 & ~r4 | |
r5 = ndvi <= 0.6 | |
# mangrove/saltmarsh | |
classified[_tmp2 & r5] = 7 | |
# mangrove | |
classified[_tmp2 & ~r5] = 3 | |
# finished rhs of r2 | |
_tmp2 = _tmp & ~r2 | |
r6 = wetness < -750 | |
r7 = ndvi >= 0.3 | |
_tmp3 = _tmp2 & r6 | |
# saltmarsh | |
classified[_tmp3 & r7] = 8 | |
# non-veg | |
classified[_tmp3 & ~r7] = 4 | |
r8 = ndvi <= 0.3 | |
_tmp3 = _tmp2 & ~r6 | |
# non-veg | |
classified[_tmp3 & r8] = 9 | |
r9 = ndvi <= 0.45 | |
_tmp2 = _tmp3 & ~r8 | |
# saltmarsh | |
classified[_tmp2 & r9] = 10 | |
r10 = ndvi <= 0.6 | |
_tmp3 = _tmp2 & ~r9 | |
# mangrove-saltmarsh | |
classified[_tmp3 & r10] = 11 | |
# mangrove | |
classified[_tmp3 & ~r10] = 1 | |
# set any nulls | |
valid = numpy.isfinite(ndvi) | |
classified[~valid] = 0 | |
return classified |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
import os | |
from os.path import join as pjoin, exists, dirname | |
import cPickle as pickle | |
import glob | |
import argparse | |
import logging | |
import pandas | |
import rasterio | |
from datacube.api.model import DatasetType | |
from classifier import classifier | |
from zonal_stats import zonal_stats | |
from zonal_stats import zonal_class_distribution | |
from image_processing.segmentation import rasterise_vector | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
class RasteriseTask(luigi.Task): | |
""" | |
Computes the rasterisation for a cell. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'rasterise_filename')) | |
ds_list_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
img_fname = ds_list[0].datasets[DatasetType.FC25].path | |
with rasterio.open(img_fname) as src: | |
crs = src.crs | |
transform = src.affine | |
height = src.height | |
width = src.width | |
res = rasterise_vector(vector_fname, shape=(height, width), | |
transform=transform, crs=crs) | |
kwargs = {'count': 1, | |
'width': width, | |
'height': height, | |
'crs': crs, | |
'transform': transform, | |
'dtype': res.dtype.name, | |
'nodata': 0} | |
with rasterio.open(out_fname, 'w', **kwargs) as src: | |
src.write(1, res) | |
# We could just set the image as the Luigi completion target... | |
with self.output().open('w') as outf: | |
outf.write('Complete') | |
class ClassifierStatsTask(luigi.Task): | |
""" | |
Computes a zonal class distribution task for the required dataset. | |
""" | |
idx = luigi.IntParameter() | |
out_fname = luigi.Parameter() | |
def requires(self): | |
return [RasteriseTask(dirname(self.out_fname))] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
rasterised_fname = pjoin(dirname(self.out_fname), | |
CONFIG.get('outputs', 'rasterise_filename')) | |
ds_list_fname = pjoin(dirname(self.out_fname), | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
dataset = ds_list[self.idx] | |
nbar_ds = dataset.datasets[DatasetType.ARG25] | |
pq_ds = dataset.datasets[DatasetType.PQ25] | |
classified_img = classifier(nbar_ds, pq_ds) | |
# hard code; as this will be short lived due to agdc-v2 development | |
class_ids = [0, 1, 2, 3, 4, 5] | |
with rasterio.open(rasterised_fname, 'r') as src: | |
zones_img = src.read(1) | |
result = zonal_class_distribution(classified_img, zones_img, | |
class_ids=class_ids) | |
# Set the timestamp | |
result['Timestamp'] = dataset.start_datetime | |
# Open the output hdf5 file | |
store = pandas.HDFStore(self.output().path) | |
# Write the dataframe | |
store.append('data', result) | |
# Save and close the file | |
store.close() | |
class CellStatsTask(luigi.Task): | |
""" | |
For a given cell define a classifier stats task for each required Dataset. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
base_name = CONFIG.get('outputs', 'stats_filename_format') | |
base_name = pjoin(self.out_dir, base_name) | |
ds_list_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
targets = [] | |
for idx, ds in enumerate(ds_list): | |
timestamp = bytes(ds.start_datetime).replace(' ', '-') | |
out_fname = base_name.format(timestamp) | |
targets.append(ClassifierStatsTask(idx, out_fname)) | |
return targets | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'CellStatsTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class CombineCellStatsTask(luigi.Task): | |
""" | |
Combines all stats files from a single cell into a single file. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [CellStatsTask(self.out_dir)] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'CombineCellStatsTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
# Get a list of the stats files for each timeslice | |
stats_files_list = glob.glob(pjoin(self.out_dir, '*.h5')) | |
# Create an output file that we can continually append data | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', | |
'combined_cell_stats_filename')) | |
combined_store = pandas.HDFStore(out_fname) | |
store = pandas.HDFStore(stats_files_list[0]) | |
# If there is nothing in the first file there will be nothing for | |
# every file | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
headings = store['data'].columns.tolist() | |
store.close() | |
df = pandas.DataFrame(columns=headings) | |
for sfile in stats_files_list: | |
store = pandas.HDFStore(sfile, 'r') | |
df = df.append(store['data']) | |
store.close() | |
df.reset_index(inplace=True) | |
# Write to disk | |
combined_store.append('data', df) | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class RunCombineCellStatsTasks(luigi.Task): | |
""" | |
Issues CombineCellStatsTask's to each cell associated | |
with the tile defined by the start and end index. | |
""" | |
idx1 = luigi.IntParameter() | |
idx2 = luigi.IntParameter() | |
def requires(self): | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells[self.idx1:self.idx2]: | |
out_dir = pjoin(base_out_dir, cell) | |
tasks.append(CombineCellStatsTask(out_dir)) | |
return tasks | |
def output(self): | |
out_dir = CONFIG.get('work', 'output_directory') | |
out_fname = pjoin(out_dir, | |
'RunCombineCellStatsTasks_{}:{}.completed') | |
out_fname = out_fname.format(self.idx1, self.idx2) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
# Setup command-line arguments | |
desc = "Processes zonal stats for a given set of cells." | |
hlp = ("The tile/chunk index to retieve from the tiles list. " | |
"(Needs to have been previously computed to a file named tiles.pkl") | |
parser = argparse.ArgumentParser(description=desc) | |
parser.add_argument('--tile', type=int, help=hlp) | |
parsed_args = parser.parse_args() | |
tile_idx = parsed_args.tile | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/stats_workflow_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Get the list of tiles (groups of cells that each node will operate on | |
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'r') as in_file: | |
tiles = pickle.load(in_file) | |
# Initialise the job | |
tile = tiles[tile_idx] | |
tasks = [RunCombineCellStatsTasks(tile[0], tile[1])] | |
luigi.build(tasks, local_scheduler=True, workers=16) | |
luigi.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
from os.path import join as pjoin, dirname, basename | |
import cPickle as pickle | |
import argparse | |
import luigi | |
import numpy | |
import pandas | |
import ogr | |
from eotools.vector import retrieve_attribute_table | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
def combine_all_cells(): | |
# Config params | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
base_out_fname = CONFIG.get('outputs', 'final_output_filename') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
combined_cell_stats_fname = CONFIG.get('outputs', | |
'combined_cell_stats_filename') | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
ngroups = int(CONFIG.get('internals', 'pandas_groups')) | |
chunksize = int(CONFIG.get('internals', 'pandas_chunksize')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
headings = ['SID', 'Timestamp', 'Band', 'Observed_Count', 'Min', 'Max', | |
'Sum', 'Sum_of_Squares'] | |
items = ['Timestamp', 'SID', 'Band'] | |
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5') | |
tmp1_store = pandas.HDFStore(tmp1_fname) | |
for cell in cells: | |
cell_dir = pjoin(base_out_dir, cell) | |
cell_stats_fname = pjoin(cell_dir, combined_cell_stats_fname) | |
# Open the current cells WC result file | |
store = pandas.HDFStore(cell_stats_fname, 'r') | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
df = store['data'] | |
df.drop('index', 1, inplace=True) | |
tmp1_store.append('data', df, index=False) | |
tmp1_store.flush() | |
store.close() | |
tmp1_store.close() | |
# Chunking method | |
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765 | |
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314 | |
# We need to group as many records we can into more memory manageable | |
# chunks, that also balance well for I/O | |
tmp1_store = pandas.HDFStore(tmp1_fname) | |
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5') | |
tmp2_store = pandas.HDFStore(tmp2_fname) | |
for chunk in tmp1_store.select('data', chunksize=chunksize): | |
g = chunk.groupby(chunk['SID'] % ngroups) | |
for grp, grouped in g: | |
tmp2_store.append('group_{}'.format(int(grp)), grouped, | |
data_columns=headings, index=False) | |
tmp2_store.flush() | |
tmp1_store.close() | |
tmp2_store.close() | |
# Define the output file | |
out_fname = pjoin(base_out_dir, base_out_fname) | |
combined_store = pandas.HDFStore(out_fname, 'w') | |
new_headings = ['FID', 'Timestamp', 'Band', 'Observed_Count', 'Min', 'Max', | |
'Sum', 'Sum_of_Squares', 'Mean', 'Variance', 'StdDev'] | |
# Now read the grouped data and write to disk | |
tmp2_store = pandas.HDFStore(tmp2_fname) | |
for key in tmp2_store.keys(): | |
grouped = tmp2_store[key].groupby(items, as_index=False) | |
df = grouped.agg({'Observed_Count': numpy.sum, 'Min': numpy.min, | |
'Max': numpy.max, 'Sum': numpy.sum, | |
'Sum_of_Squares': numpy.sum}) | |
# Account for the offset between the feature and segment ID's | |
df['SID'] = df['SID'] - 1 | |
# Change the segment id column name to feature id | |
df.rename(columns={'SID': 'FID'}, inplace=True) | |
# Calculate the mean, variance, stddev | |
df['Mean'] = df['Sum'] / df['Observed_Count'] | |
df['Variance'] = ((df['Sum_of_Squares'] - (df['Observed_Count'] * | |
df['Mean']**2)) / | |
(df['Observed_Count'] - 1)) | |
df['StdDev'] = numpy.sqrt(df['Variance'].values) | |
# Write the group to disk | |
combined_store.append('data', df, data_columns=new_headings) | |
combined_store.flush() | |
# Add metadata | |
metadata_group = 'Metadata' | |
metadata = {'Vector_Filename': basename(vector_fname)} | |
metadata = pandas.DataFrame(metadata, index=[0]) | |
combined_store[metadata_group] = metadata | |
# Add the vector attribute table | |
vec_ds = ogr.Open(vector_fname) | |
lyr = vec_ds.GetLayer() | |
attribute_table = retrieve_attribute_table(lyr) | |
combined_store['attribute_table'] = attribute_table | |
# Save and close the file | |
combined_store.close() | |
tmp1_store.close() | |
tmp2_store.close() | |
# Clean up temporary files | |
os.remove(tmp1_fname) | |
os.remove(tmp2_fname) | |
def combine_all_cells_distribution(): | |
""" | |
""" | |
# Config params | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
base_out_fname = CONFIG.get('outputs', 'final_output_filename') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
combined_cell_stats_fname = CONFIG.get('outputs', | |
'combined_cell_stats_filename') | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
ngroups = int(CONFIG.get('internals', 'pandas_groups')) | |
chunksize = int(CONFIG.get('internals', 'pandas_chunksize')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
# 1st stage, combine all the results from each cell into a single file | |
tmp1_fname = pjoin(base_out_dir, 'tmp_combined_results.h5') | |
tmp1_store = pandas.HDFStore(tmp1_fname, 'w') | |
for cell in cells: | |
cell_dir = pjoin(base_out_dir, cell) | |
cell_stats_fname = pjoin(cell_dir, combined_cell_stats_fname) | |
# Open the current cells WC result file | |
store = pandas.HDFStore(cell_stats_fname, 'r') | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
df = store['data'] | |
headings = df.columns.values.tolist() | |
df.drop('index', 1, inplace=True) | |
tmp1_store.append('data', df, index=False) | |
tmp1_store.flush() | |
store.close() | |
tmp1_store.close() | |
# Chunking method | |
# http://stackoverflow.com/questions/25459982/trouble-with-grouby-on-millions-of-keys-on-a-chunked-file-in-python-pandas/25471765#25471765 | |
# http://stackoverflow.com/questions/15798209/pandas-group-by-query-on-large-data-in-hdfstore/15800314#15800314 | |
# We need to group as many records we can into more memory manageable | |
# chunks, that also balance well for I/O | |
tmp1_store = pandas.HDFStore(tmp1_fname) | |
tmp2_fname = pjoin(base_out_dir, 'tmp_grouped_results.h5') | |
tmp2_store = pandas.HDFStore(tmp2_fname, 'w') | |
for chunk in tmp1_store.select('data', chunksize=chunksize): | |
g = chunk.groupby(chunk['SID'] % ngroups) | |
for grp, grouped in g: | |
tmp2_store.append('group_{}'.format(int(grp)), grouped, | |
data_columns=headings, index=False) | |
tmp2_store.flush() | |
tmp1_store.close() | |
tmp2_store.close() | |
# TODO We need a generic way of allowing a user to insert custom | |
# classification headings as opposed to the numeric code | |
# without begin specific like we did for WOfS. | |
new_headings = ['FID' if x == 'SID' else x for x in headings] | |
items = ['Timestamp', 'SID'] | |
# Define the output file | |
out_fname = pjoin(base_out_dir, base_out_fname) | |
combined_store = pandas.HDFStore(out_fname, 'w') | |
# Now read the grouped data and write to disk | |
tmp2_store = pandas.HDFStore(tmp2_fname) | |
for key in tmp2_store.keys(): | |
df = tmp2_store[key].groupby(items, as_index=False).sum() | |
# Change the segment id column name to feature id | |
df.rename(columns={'SID': 'FID'}, inplace=True) | |
combined_store.append('data', df, data_columns=new_headings) | |
combined_store.flush() | |
# Save and close the file | |
combined_store.close() | |
tmp1_store.close() | |
tmp2_store.close() | |
# Clean up temporary files | |
os.remove(tmp1_fname) | |
os.remove(tmp2_fname) | |
if __name__ == '__main__': | |
# combine_all_cells() | |
combine_all_cells_distribution() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[work] | |
output_directory = /g/data/v10/testing_ground/jps547/pilbara/results | |
logs_directory = %(output_directory)s/logs | |
satellites = LS5,LS7,LS8 | |
min_date = 1985_01_01 | |
max_date = 2015_12_31 | |
vector_filename = /g/data/v10/testing_ground/jps547/pilbara/CaneRiver_Nanutarra_MtMinnie_Peedamulla_RedHill_Yarraloola_Merge_WGS84_LandSystemType.shp | |
[internals] | |
envelope = 0 | |
cells_per_node = 1 | |
pandas_groups = 10 | |
pandas_chunksize = 1000000 | |
water_directory = /g/data/u46/wofs/water_f7s/extents | |
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp | |
[outputs] | |
cells_list = cells_to_process.pkl | |
tiles_list = tiles.pkl | |
pbs_filename = zonal_stats_pbsdsh.bash | |
query_filename = datasets_list.pkl | |
stats_filename_format = stats_result_{}.h5 | |
combined_cell_stats_filename = combined_cell_stats.h5 | |
rasterise_filename = rasterised_result.tif | |
final_output_filename = pilbara_zonal_stats_results.h5 | |
groups_filename_format = tmp_group_{}.h5 | |
#[core] | |
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg | |
[pbs] | |
project = v10 | |
queue = normal | |
walltime = 05:00:00 | |
email = [email protected] | |
modules = python/2.7.6,gdal/1.11.1-python,agdc-api,pandas,IDL_functions/test,EOtools/test,gaip/test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Data access functions | |
--------------------- | |
""" | |
import rasterio | |
from rasterio import crs | |
from rasterio.warp import RESAMPLING | |
def write_img(array, filename, fmt='ENVI', geobox=None, nodata=None, compress=None, tags=None): | |
""" | |
Writes a 2D/3D image to disk using rasterio. | |
:param array: | |
A 2D/3D NumPy array. | |
:param filename: | |
A string containing the output file name. | |
:param fmt: | |
A string containing a GDAL compliant image fmt. Default is | |
'ENVI'. | |
:param geobox: | |
An instance of a GriddedGeoBox object. | |
:param nodata: | |
A value representing the no data value for the array. | |
:param compress: | |
A compression algorithm name (e.g. 'lzw'). | |
:param tags: | |
A dictionary of dataset-level metadata. | |
""" | |
# Get the datatype of the array | |
dtype = array.dtype.name | |
# Check for excluded datatypes | |
excluded_dtypes = ['int64', 'int8', 'uint64'] | |
if dtype in excluded_dtypes: | |
msg = "Datatype not supported: {dt}".format(dt=dtype) | |
raise TypeError(msg) | |
ndims = array.ndim | |
dims = array.shape | |
# Get the (z, y, x) dimensions (assuming BSQ interleave) | |
if ndims == 2: | |
samples = dims[1] | |
lines = dims[0] | |
bands = 1 | |
elif ndims == 3: | |
samples = dims[2] | |
lines = dims[1] | |
bands = dims[0] | |
else: | |
print 'Input array is not of 2 or 3 dimensions!!!' | |
err = 'Array dimensions: {dims}'.format(dims=ndims) | |
raise IndexError(err) | |
# If we have a geobox, then retrieve the geotransform and projection | |
if geobox is not None: | |
transform = geobox.affine | |
projection = bytes(geobox.crs.ExportToWkt()) | |
else: | |
transform = None | |
projection = None | |
kwargs = {'count': bands, | |
'width': samples, | |
'height': lines, | |
'crs': projection, | |
'transform': transform, | |
'dtype': dtype, | |
'driver': fmt, | |
'nodata': nodata} | |
if fmt == 'GTiff' and compress is not None: | |
kwargs.update({'compress': compress}) | |
with rasterio.open(filename, 'w', **kwargs) as outds: | |
if bands == 1: | |
outds.write_band(1, array) | |
else: | |
for i in range(bands): | |
outds.write_band(i + 1, array[i]) | |
if tags is not None: | |
outds.update_tags(**tags) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
from datetime import date | |
import os | |
from os.path import join as pjoin, exists, dirname | |
import cPickle as pickle | |
import subprocess | |
import logging | |
import ogr | |
from datacube.api.query import list_tiles_as_list | |
from datacube.api.model import DatasetType, Satellite, BANDS | |
from datacube.config import Config | |
from eotools.vector import spatial_intersection | |
import pdb | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
PBS_DSH = ( | |
"""#!/bin/bash | |
#PBS -P {project} | |
#PBS -q {queue} | |
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB | |
#PBS -l wd | |
#PBS -me | |
#PBS -M {email} | |
NNODES={nnodes} | |
for i in $(seq 1 $NNODES); do | |
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \ | |
--tile $[$i - 1]" & | |
done; | |
wait | |
""") | |
def get_cells(): | |
""" | |
A simple procedure to generate a list which cells that will | |
be analysed throughout the workflow. | |
""" | |
out_dir = CONFIG.get('work', 'output_directory') | |
envelope = bool(CONFIG.get('internals', 'envelope')) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
dcube_vector_fname = CONFIG.get('internals', 'cell_grid') | |
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list')) | |
fids = spatial_intersection(dcube_vector_fname, vector_fname, | |
envelope=envelope) | |
vec_ds = ogr.Open(dcube_vector_fname) | |
layer = vec_ds.GetLayer(0) | |
cell_dirs = [] | |
cells = [] | |
for fid in fids: | |
feat = layer.GetFeature(fid) | |
xmin = int(feat.GetField("XMIN")) | |
ymin = int(feat.GetField("YMIN")) | |
#cell_dir = "{:03.0f}_{:04.0f}".format(xmin, ymin) | |
cell_dir = "{}_{}".format(xmin, ymin) | |
cell_dirs.append(cell_dir) | |
cells.append((xmin, ymin)) | |
out_cell_dir = pjoin(out_dir, cell_dir) | |
if not exists(out_cell_dir): | |
os.makedirs(out_cell_dir) | |
with open(out_fname, 'w') as outf: | |
pickle.dump(cell_dirs, outf) | |
return cells | |
def query_cells(cell_list, satellites, min_date, max_date, dataset_type, | |
output_dir): | |
""" | |
""" | |
config = Config() | |
base_out_fname = CONFIG.get('outputs', 'query_filename') | |
for cell in cell_list: | |
x_cell = [int(cell[0])] | |
y_cell = [int(cell[1])] | |
tiles = list_tiles_as_list(x=x_cell, y=y_cell, acq_min=min_date, | |
acq_max=max_date, datasets=dataset_type, | |
satellites=satellites, | |
database=config.get_db_database(), | |
user=config.get_db_username(), | |
password=config.get_db_password(), | |
host=config.get_db_host(), | |
port=config.get_db_port()) | |
out_dir = pjoin(output_dir, '{}_{}'.format(cell[0], cell[1])) | |
out_fname = pjoin(out_dir, base_out_fname) | |
with open(out_fname, 'w') as outf: | |
pickle.dump(tiles, outf) | |
def create_tiles(array_size, tile_size=25): | |
""" | |
A minor function to tile a 1D array or list into smaller manageable | |
portions. | |
""" | |
idx_start = range(0, array_size, tile_size) | |
idx_tiles = [] | |
for idx_st in idx_start: | |
if idx_st + tile_size < array_size: | |
idx_end = idx_st + tile_size | |
else: | |
idx_end = array_size | |
idx_tiles.append((idx_st, idx_end)) | |
return idx_tiles | |
if __name__ == '__main__': | |
# Create the output directory | |
out_dir = CONFIG.get('work', 'output_directory') | |
if not exists(out_dir): | |
os.makedirs(out_dir) | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/query_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Just turn these into a simple function calls. In this framework we don't | |
# need a luigi workload for a simple single task | |
cells = get_cells() | |
# Define the fractional cover dataset | |
# TODO have this defined through the config.cfg | |
fc_ds_type = DatasetType.FC25 | |
# Get the satellites we wish to query | |
satellites = CONFIG.get('work', 'satellites') | |
satellites = [Satellite(i) for i in satellites.split(',')] | |
# Get the min/max date range to query | |
min_date = CONFIG.get('work', 'min_date') | |
min_date = [int(i) for i in min_date.split('_')] | |
min_date = date(min_date[0], min_date[1], min_date[2]) | |
max_date = CONFIG.get('work', 'max_date') | |
max_date = [int(i) for i in max_date.split('_')] | |
max_date = date(max_date[0], max_date[1], max_date[2]) | |
output_dir = CONFIG.get('work', 'output_directory') | |
query_cells(cells, satellites, min_date, max_date, fc_ds_type, output_dir) | |
# Create the tiles list that contains groups of cells to operate on | |
# Each node will have a certain number of cells to work on | |
cpnode = int(CONFIG.get('internals', 'cells_per_node')) | |
tiles = create_tiles(len(cells), cpnode) | |
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'w') as out_file: | |
pickle.dump(tiles, out_file) | |
# Setup the modules to use for the job | |
modules = CONFIG.get('pbs', 'modules').split(',') | |
modules = ['module load {}; '.format(module) for module in modules] | |
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;') | |
modules = ''.join(modules) | |
# Calculate the job node and cpu requirements | |
nnodes = len(tiles) | |
ncpus = nnodes * 16 | |
mem = nnodes * 32 | |
# Populate the PBS shell script | |
project = CONFIG.get('pbs', 'project') | |
queue = CONFIG.get('pbs', 'queue') | |
walltime = CONFIG.get('pbs', 'walltime') | |
email = CONFIG.get('pbs', 'email') | |
py_file = pjoin(dirname(__file__), 'workflow.py') | |
pbs_job = PBS_DSH.format(project=project, queue=queue, | |
walltime=walltime, ncpus=ncpus, mem=mem, | |
email=email, nnodes=nnodes, | |
modules=modules, pyfile=py_file) | |
# Out put the shell script to disk | |
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename')) | |
with open(pbs_fname, 'w') as out_file: | |
out_file.write(pbs_job) | |
#subprocess.check_call(['qsub', pbs_fname]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=05:00:00,mem=16GB,ncpus=1 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load gdal/1.10.1 | |
module load IDL_functions/test | |
module load gaip/test | |
module load agdc-api | |
module load pandas | |
PY_FILE=/home/547/jps547/git_repositories/sixy6e/9ea690493970eb12a237/combine_stats.py | |
python $PY_FILE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
from datetime import date | |
import os | |
from os.path import join as pjoin, exists, dirname, abspath | |
import cPickle as pickle | |
import subprocess | |
import logging | |
import ogr | |
from datacube.api.query import list_tiles_as_list | |
from datacube.api.model import DatasetType, Satellite, BANDS | |
from datacube.config import Config | |
from eotools.vector import spatial_intersection | |
import pdb | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
PBS_DSH = ( | |
"""#!/bin/bash | |
#PBS -P {project} | |
#PBS -q {queue} | |
#PBS -l walltime={walltime},ncpus={ncpus},mem={mem}GB,jobfs=350GB | |
#PBS -l wd | |
#PBS -me | |
#PBS -M {email} | |
NNODES={nnodes} | |
for i in $(seq 1 $NNODES); do | |
pbsdsh -n $((16 *$i)) -- bash -l -c "{modules} PBS_NNODES=$NNODES PBS_VNODENUM=$i python {pyfile} \ | |
--tile $[$i - 1]" & | |
done; | |
wait | |
""") | |
def get_cells(): | |
""" | |
A simple procedure to generate a list which cells that will | |
be analysed throughout the workflow. | |
""" | |
out_dir = CONFIG.get('work', 'output_directory') | |
envelope = bool(int(CONFIG.get('internals', 'envelope'))) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
dcube_vector_fname = CONFIG.get('internals', 'cell_grid') | |
out_fname = pjoin(out_dir, CONFIG.get('outputs', 'cells_list')) | |
fids = spatial_intersection(vector_fname, dcube_vector_fname, | |
envelope=envelope) | |
print fids | |
vec_ds = ogr.Open(dcube_vector_fname) | |
layer = vec_ds.GetLayer(0) | |
cell_dirs = [] | |
cells = [] | |
for fid in fids: | |
feat = layer.GetFeature(fid) | |
xmin = int(feat.GetField("XMIN")) | |
ymin = int(feat.GetField("YMIN")) | |
#cell_dir = "{:03.0f}_{:04.0f}".format(xmin, ymin) | |
cell_dir = "{}_{}".format(xmin, ymin) | |
cell_dirs.append(cell_dir) | |
cells.append((xmin, ymin)) | |
out_cell_dir = pjoin(out_dir, cell_dir) | |
if not exists(out_cell_dir): | |
os.makedirs(out_cell_dir) | |
with open(out_fname, 'w') as outf: | |
pickle.dump(cell_dirs, outf) | |
return cells | |
def query_cells(cell_list, satellites, min_date, max_date, dataset_types, | |
output_dir): | |
""" | |
""" | |
config = Config() | |
base_out_fname = CONFIG.get('outputs', 'query_filename') | |
for cell in cell_list: | |
x_cell = [int(cell[0])] | |
y_cell = [int(cell[1])] | |
tiles = list_tiles_as_list(x=x_cell, y=y_cell, acq_min=min_date, | |
acq_max=max_date, | |
dataset_types=dataset_types, | |
satellites=satellites) | |
out_dir = pjoin(output_dir, '{}_{}'.format(cell[0], cell[1])) | |
out_fname = pjoin(out_dir, base_out_fname) | |
with open(out_fname, 'w') as outf: | |
pickle.dump(tiles, outf) | |
def create_tiles(array_size, tile_size=25): | |
""" | |
A minor function to tile a 1D array or list into smaller manageable | |
portions. | |
""" | |
idx_start = range(0, array_size, tile_size) | |
idx_tiles = [] | |
for idx_st in idx_start: | |
if idx_st + tile_size < array_size: | |
idx_end = idx_st + tile_size | |
else: | |
idx_end = array_size | |
idx_tiles.append((idx_st, idx_end)) | |
return idx_tiles | |
if __name__ == '__main__': | |
# Create the output directory | |
out_dir = CONFIG.get('work', 'output_directory') | |
if not exists(out_dir): | |
os.makedirs(out_dir) | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/query_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Just turn these into a simple function calls. In this framework we don't | |
# need a luigi workload for a simple single task | |
cells = get_cells() | |
# Define the fractional cover dataset | |
# TODO have this defined through the config.cfg | |
ds_types = [DatasetType.ARG25, DatasetType.PQ25] | |
# Get the satellites we wish to query | |
satellites = CONFIG.get('work', 'satellites') | |
satellites = [Satellite(i) for i in satellites.split(',')] | |
# Get the min/max date range to query | |
min_date = CONFIG.get('work', 'min_date') | |
min_date = [int(i) for i in min_date.split('_')] | |
min_date = date(min_date[0], min_date[1], min_date[2]) | |
max_date = CONFIG.get('work', 'max_date') | |
max_date = [int(i) for i in max_date.split('_')] | |
max_date = date(max_date[0], max_date[1], max_date[2]) | |
output_dir = CONFIG.get('work', 'output_directory') | |
query_cells(cells, satellites, min_date, max_date, ds_types, output_dir) | |
# Create the tiles list that contains groups of cells to operate on | |
# Each node will have a certain number of cells to work on | |
cpnode = int(CONFIG.get('internals', 'cells_per_node')) | |
tiles = create_tiles(len(cells), cpnode) | |
tiles_list_fname = pjoin(out_dir, CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'w') as out_file: | |
pickle.dump(tiles, out_file) | |
# Setup the modules to use for the job | |
modules = CONFIG.get('pbs', 'modules').split(',') | |
modules = ['module load {}; '.format(module) for module in modules] | |
modules.insert(0, 'module use /projects/u46/opt/modules/modulefiles;') | |
modules = ''.join(modules) | |
# Calculate the job node and cpu requirements | |
nnodes = len(tiles) | |
ncpus = nnodes * 16 | |
mem = nnodes * 32 | |
# Populate the PBS shell script | |
project = CONFIG.get('pbs', 'project') | |
queue = CONFIG.get('pbs', 'queue') | |
walltime = CONFIG.get('pbs', 'walltime') | |
email = CONFIG.get('pbs', 'email') | |
py_file = pjoin(dirname(abspath(__file__)), 'classifier_workflow.py') | |
pbs_job = PBS_DSH.format(project=project, queue=queue, | |
walltime=walltime, ncpus=ncpus, mem=mem, | |
email=email, nnodes=nnodes, | |
modules=modules, pyfile=py_file) | |
# Out put the shell script to disk | |
pbs_fname = pjoin(out_dir, CONFIG.get('outputs', 'pbs_filename')) | |
with open(pbs_fname, 'w') as out_file: | |
out_file.write(pbs_job) | |
#subprocess.check_call(['qsub', pbs_fname]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import luigi | |
import os | |
from os.path import join as pjoin, exists, dirname | |
import cPickle as pickle | |
import glob | |
import argparse | |
import logging | |
import pandas | |
import rasterio | |
from datacube.api.model import DatasetType | |
from zonal_stats import zonal_stats | |
from image_processing.segmentation import rasterise_vector | |
CONFIG = luigi.configuration.get_config() | |
CONFIG.add_config_path(pjoin(dirname(__file__), 'config.cfg')) | |
class RasteriseTask(luigi.Task): | |
""" | |
Computes the rasterisation for a cell. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'RasteriseTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'rasterise_filename')) | |
ds_list_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
vector_fname = CONFIG.get('work', 'vector_filename') | |
img_fname = ds_list[0].datasets[DatasetType.FC25].path | |
with rasterio.open(img_fname) as src: | |
crs = src.crs | |
transform = src.affine | |
height = src.height | |
width = src.width | |
res = rasterise_vector(vector_fname, shape=(height, width), | |
transform=transform, crs=crs) | |
kwargs = {'count': 1, | |
'width': width, | |
'height': height, | |
'crs': crs, | |
'transform': transform, | |
'dtype': res.dtype.name, | |
'nodata': 0} | |
with rasterio.open(out_fname, 'w', **kwargs) as src: | |
src.write(1, res) | |
# We could just set the image as the Luigi completion target... | |
with self.output().open('w') as outf: | |
outf.write('Complete') | |
class StatsTask(luigi.Task): | |
""" | |
Computes a stats task for the required dataset. | |
""" | |
idx = luigi.IntParameter() | |
out_fname = luigi.Parameter() | |
def requires(self): | |
return [RasteriseTask(dirname(self.out_fname))] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
rasterised_fname = pjoin(dirname(self.out_fname), | |
CONFIG.get('outputs', 'rasterise_filename')) | |
ds_list_fname = pjoin(dirname(self.out_fname), | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
dataset = ds_list[self.idx] | |
dataset_type = DatasetType.FC25 | |
result = zonal_stats(dataset, rasterised_fname, dataset_type) | |
# Open the output hdf5 file | |
store = pandas.HDFStore(self.output().path) | |
# Write the dataframe | |
store.append('data', result) | |
# Save and close the file | |
store.close() | |
class CellStatsTask(luigi.Task): | |
""" | |
For a given cell define a stats task for each required DatasetType. | |
eg For each FC file. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
base_name = CONFIG.get('outputs', 'stats_filename_format') | |
base_name = pjoin(self.out_dir, base_name) | |
ds_list_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', 'query_filename')) | |
with open(ds_list_fname, 'r') as infile: | |
ds_list = pickle.load(infile) | |
targets = [] | |
for idx, ds in enumerate(ds_list): | |
timestamp = bytes(ds.start_datetime).replace(' ', '-') | |
out_fname = base_name.format(timestamp) | |
targets.append(StatsTask(idx, out_fname)) | |
return targets | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'CellStatsTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class CombineCellStatsTask(luigi.Task): | |
""" | |
Combines all stats files from a single cell into a single file. | |
""" | |
out_dir = luigi.Parameter() | |
def requires(self): | |
return [CellStatsTask(self.out_dir)] | |
def output(self): | |
out_fname = pjoin(self.out_dir, 'CombineCellStatsTask.completed') | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
# Get a list of the stats files for each timeslice | |
stats_files_list = glob.glob(pjoin(self.out_dir, '*.h5')) | |
# Create an output file that we can continually append data | |
out_fname = pjoin(self.out_dir, | |
CONFIG.get('outputs', | |
'combined_cell_stats_filename')) | |
combined_store = pandas.HDFStore(out_fname) | |
store = pandas.HDFStore(stats_files_list[0]) | |
# If there is nothing in the first file there will be nothing for | |
# every file | |
if '/data' in store.keys(): | |
# We have data to retrieve | |
headings = store['data'].columns.tolist() | |
store.close() | |
df = pandas.DataFrame(columns=headings) | |
for sfile in stats_files_list: | |
store = pandas.HDFStore(sfile, 'r') | |
df = df.append(store['data']) | |
store.close() | |
df.reset_index(inplace=True) | |
# Write to disk | |
combined_store.append('data', df) | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
class RunCombineCellStatsTasks(luigi.Task): | |
""" | |
Issues CombineCellStatsTask's to each cell associated | |
with the tile defined by the start and end index. | |
""" | |
idx1 = luigi.IntParameter() | |
idx2 = luigi.IntParameter() | |
def requires(self): | |
base_out_dir = CONFIG.get('work', 'output_directory') | |
cells_list_fname = pjoin(base_out_dir, | |
CONFIG.get('outputs', 'cells_list')) | |
with open(cells_list_fname, 'r') as infile: | |
cells = pickle.load(infile) | |
tasks = [] | |
for cell in cells[self.idx1:self.idx2]: | |
out_dir = pjoin(base_out_dir, cell) | |
tasks.append(CombineCellStatsTask(out_dir)) | |
return tasks | |
def output(self): | |
out_dir = CONFIG.get('work', 'output_directory') | |
out_fname = pjoin(out_dir, | |
'RunCombineCellStatsTasks_{}:{}.completed') | |
out_fname = out_fname.format(self.idx1, self.idx2) | |
return luigi.LocalTarget(out_fname) | |
def run(self): | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
# Setup command-line arguments | |
desc = "Processes zonal stats for a given set of cells." | |
hlp = ("The tile/chunk index to retieve from the tiles list. " | |
"(Needs to have been previously computed to a file named tiles.pkl") | |
parser = argparse.ArgumentParser(description=desc) | |
parser.add_argument('--tile', type=int, help=hlp) | |
parsed_args = parser.parse_args() | |
tile_idx = parsed_args.tile | |
# setup logging | |
log_dir = CONFIG.get('work', 'logs_directory') | |
if not exists(log_dir): | |
os.makedirs(log_dir) | |
logfile = "{log_path}/stats_workflow_{uname}_{pid}.log" | |
logfile = logfile.format(log_path=log_dir, uname=os.uname()[1], | |
pid=os.getpid()) | |
logging_level = logging.INFO | |
logging.basicConfig(filename=logfile, level=logging_level, | |
format=("%(asctime)s: [%(name)s] (%(levelname)s) " | |
"%(message)s ")) | |
# Get the list of tiles (groups of cells that each node will operate on | |
tiles_list_fname = pjoin(CONFIG.get('work', 'output_directory'), | |
CONFIG.get('outputs', 'tiles_list')) | |
with open(tiles_list_fname, 'r') as in_file: | |
tiles = pickle.load(in_file) | |
# Initialise the job | |
tile = tiles[tile_idx] | |
tasks = [RunCombineCellStatsTasks(tile[0], tile[1])] | |
luigi.build(tasks, local_scheduler=True, workers=16) | |
luigi.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy | |
import gdal | |
import pandas | |
from datacube.api.model import DatasetType | |
from datacube.api.utils import get_dataset_data_with_pq | |
from eotools.drivers.stacked_dataset import StackedDataset | |
from eotools.geobox import GriddedGeoBox | |
from image_processing.segmentation import Segments | |
from idl_functions import histogram | |
def zonal_stats(dataset, rasterised_fname, dataset_type): | |
""" | |
Computes the Observed Count, Min, Max, Sum and Sum of Squares for | |
the segments defined by the rasterised image. The stats are | |
derived from the `dataset` defined by the `dataset_type`. | |
:param dataset: | |
A class of type `Dataset`. | |
:param rasterised_fname: | |
A string containing the full file pathname of an image | |
containing the rasterised features. These features will be | |
interpreted as segments. | |
:param dataset_type: | |
A class of type `DatasetType`. | |
:return: | |
A `pandas.DataFrame` containing the statistics for each segment | |
and for each raster band contained with the `dataset_type`. | |
""" | |
# Initialiase a blank dataframe | |
headings = ["SID", "Timestamp", "Band", "Observed_Count", "Min", "Max", | |
"Sum", "Sum_of_Squares"] | |
df = pandas.DataFrame(columns=headings, dtype=numpy.float) | |
# Read the rasterised image | |
sd = StackedDataset(rasterised_fname) | |
img = sd.read_raster_band() | |
# Initialise the segment visitor | |
seg_vis = Segments(img) | |
# Do we have any data to analyse??? | |
if seg_vis.n_segments == 0: | |
return df | |
# We need to get the PQ data and the DatasetType of interest | |
pq_ds = dataset.datasets[DatasetType.PQ25] | |
ds = dataset.datasets[dataset_type] | |
timestamp = dataset.start_datetime | |
bands = ds.bands | |
no_data = -999 | |
for band in bands: | |
# When the api has a release of get_pq_mask this will have to do | |
# It'll re-compute the PQ every time which is not ideal | |
# Otherwise go back to eotools??? | |
ds_data = (get_dataset_data_with_pq(ds, pq_ds, bands=[band], | |
ndv=no_data)[band]).astype('float') | |
# Set no-data to NaN | |
ds_data[ds_data == no_data] = numpy.nan | |
# Loop over each segment and get the data. | |
# In other instances we may just need the locations | |
for seg_id in seg_vis.segment_ids: | |
data = seg_vis.data(ds_data, segment_id=seg_id) | |
# dimensions of the data which will be 1D | |
dim = data.shape | |
# Returns are 1D arrays, so check if we have an empty array | |
if dim[0] == 0: | |
continue # Empty bin, (no data), skipping | |
# Compute the stats | |
count = numpy.sum(numpy.isfinite(data)) | |
sum_ = numpy.nansum(data) | |
sum_sq = numpy.nansum(data**2) | |
min_ = numpy.nanmin(data) | |
max_ = numpy.nanmax(data) | |
format_dict = {"SID": seg_id, | |
"Timestamp": timestamp, | |
"Band": band.name, | |
"Observed_Count": count, | |
"Min": min_, | |
"Max": max_, | |
"Sum": sum_, | |
"Sum_of_Squares": sum_sq} | |
# Append the stat to the data frame | |
df = df.append(format_dict, ignore_index=True) | |
return df | |
def zonal_class_distribution(classified_image, zonal_image, class_ids=None): | |
""" | |
Calculates the classification distribution for each zone. | |
""" | |
# Initialise the segment visitor | |
seg_vis = Segments(zonal_image) | |
# Define the min/max class distribution if we didn't receive any class ID's | |
if class_ids is None: | |
min_class = 0 | |
max_class = numpy.max(classified_image) | |
# if max_class == min_class: | |
# We have a completely unclassified image | |
# TODO Should we do anything??? | |
# If someone is continually calling this routine to build up | |
# a table of results for numerous images based on the same | |
# classification schema/algorithm, then idealy they should provide | |
# a list of class id's, otherwise there could be mis-matching | |
# histograms??? | |
else: | |
min_class = numpy.min(class_ids) | |
max_class = numpy.max(class_ids) | |
# Initialise the dict to store our results | |
results = {} | |
# base class name | |
cname_format = 'Class_{}' | |
# Retrieve the class distributions for each segment/zone | |
for zid in seg_vis.segment_ids: | |
data = seg_vis.data(classified_image, segment_id=zid) | |
# Skip invalid segments | |
if data.size == 0: | |
continue | |
results[zid] = histogram(data, minv=min_class, | |
maxv=max_class)['histogram'] | |
results = pandas.DataFrame(results).transpose() | |
results['SID'] = results.index.values | |
results['PixelCount'] = seg_vis.histogram[results.index.values] | |
# convert the class names from ints to strings | |
for col in results.columns: | |
if type(col) != bytes: | |
cname = cname_format.format(col) | |
results.rename(columns={col: cname}, inplace=True) | |
return results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[work] | |
output_directory = /g/data/v10/testing_ground/jps547/pilbara/results | |
logs_directory = %(output_directory)s/logs | |
satellites = LS5,LS7,LS8 | |
min_date = 1985_01_01 | |
max_date = 2015_12_31 | |
vector_filename = /g/data/v10/testing_ground/jps547/pilbara/CaneRiver_Nanutarra_MtMinnie_Peedamulla_RedHill_Yarraloola_Merge_WGS84_LandSystemType.shp | |
[internals] | |
envelope = 0 | |
cells_per_node = 1 | |
pandas_groups = 10 | |
pandas_chunksize = 1000000 | |
water_directory = /g/data/u46/wofs/water_f7s/extents | |
cell_grid = /short/v10/jps547/DatacubeCellGrid/Cell_Grid_WGS84.shp | |
[outputs] | |
cells_list = cells_to_process.pkl | |
tiles_list = tiles.pkl | |
pbs_filename = zonal_stats_pbsdsh.bash | |
query_filename = datasets_list.pkl | |
stats_filename_format = stats_result_{}.h5 | |
combined_cell_stats_filename = combined_cell_stats.h5 | |
rasterise_filename = rasterised_result.tif | |
final_output_filename = pilbara_zonal_stats_results.h5 | |
groups_filename_format = tmp_group_{}.h5 | |
#[core] | |
#logging_conf_file: /home/547/jps547/git_repositories/sixy6e/cd7964d52928e22a056e/logging.cfg | |
[pbs] | |
project = v10 | |
queue = normal | |
walltime = 05:00:00 | |
email = [email protected] | |
modules = python/2.7.6,gdal/1.11.1-python,agdc-api,pandas,IDL_functions/test,EOtools/test,gaip/test |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment