Last active
September 24, 2021 12:33
-
-
Save sixy6e/cbb346262cc87ed87dc04bade5536d05 to your computer and use it in GitHub Desktop.
QA algorithm extraction prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# mypy | |
.mypy_cache/ | |
.dmypy.json | |
dmypy.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A toy prototype of a py-module that contains the algorithm components from the [hyo2_qc](https://github.com/hydroffice/hyo2_qc) | |
GUI toolkit module. | |
Only the fliers and gridqa submodules were extracted. | |
The base algorithms haven't been changed, but variable names were changed to be more descriptive (well those that I could interpret). | |
Most functions were converted to using straight numpy as there wasn't a need for them to be in Cython. | |
Some components used numexpr for a memory reduction as the array equations were lengthy in some instances, which would result in | |
a lot of temporary arrays being created behind the scenes. | |
A couple of the funcs still needed to have the per-pixel logic, but these have been rewritten in plain Python but will get compiled by | |
[Numba](https://numba.pydata.org/). This should give the same performance, if not better, whilst removing the need for knowing Cython. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from pathlib import Path | |
from enum import Enum | |
import numpy | |
import pdb | |
import attr | |
from typing import List | |
CHECKSUM_BIT = 0x80000000 | |
NANO_SECONDS_SF = 1e-9 | |
class RecordTypes(Enum): | |
"""The various record type contained within the GSF file.""" | |
GSF_HEADER = 1 | |
GSF_SWATH_BATHYMETRY_PING = 2 | |
GSF_SOUND_VELOCITY_PROFILE = 3 | |
GSF_PROCESSING_PARAMETERS = 4 | |
GSF_SENSOR_PARAMETERS = 5 | |
GSF_COMMENT = 6 | |
GSF_HISTORY = 7 | |
GSF_NAVIGATION_ERROR = 8 | |
GSF_SWATH_BATHY_SUMMARY = 9 | |
GSF_SINGLE_BEAM_PING = 10 | |
GSF_HV_NAVIGATION_ERROR = 11 | |
GSF_ATTITUDE = 12 | |
@attr.s() | |
class RecordCount: | |
record_type: RecordTypes = attr.ib() | |
count: int = attr.ib(default=0) | |
@attr.s() | |
class Record: | |
record_type: RecordTypes = attr.ib() | |
data_size: int = attr.ib() | |
checksum_flag: bool = attr.ib() | |
index: int = attr.ib() | |
@attr.s() | |
class FileRecordIndex: | |
record_type: RecordTypes = attr.ib() | |
record_count: int = attr.ib(init=False) | |
data_size: List[int] = attr.ib(repr=False) | |
checksum_flag: List[bool] = attr.ib(repr=False) | |
indices: List[int] = attr.ib(repr=False) | |
def __attrs_post_init__(self): | |
self.record_count = len(self.indices) | |
def record(self, index): | |
result = Record( | |
record_type=self.record_type, | |
data_size=self.data_size[index], | |
checksum_flag=self.checksum_flag[index], | |
index=self.indices[index] | |
) | |
return result | |
def file_info(stream): | |
fname = Path(stream.name) | |
fsize = fname.stat().st_size | |
current_pos = stream.tell() | |
stream.seek(0) | |
results = {rtype: {"indices": [], "data_size": [], "checksum_flag": []} for rtype in RecordTypes} | |
while stream.tell() < fsize: | |
data_size, record_id, flag = record_info(stream) | |
results[RecordTypes(record_id)]["indices"].append(stream.tell()) | |
results[RecordTypes(record_id)]["data_size"].append(data_size) | |
results[RecordTypes(record_id)]["checksum_flag"].append(flag) | |
_ = numpy.fromfile(stream, f"S{data_size}", count=1) | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
stream.seek(current_pos) | |
r_index = [ | |
FileRecordIndex( | |
record_type=rtype, | |
data_size=results[rtype]["data_size"], | |
checksum_flag=results[rtype]["checksum_flag"], | |
indices=results[rtype]["indices"] | |
) | |
for rtype in RecordTypes | |
] | |
return r_index | |
def count_records(stream): | |
fname = Path(stream.name) | |
fsize = fname.stat().st_size | |
current_pos = stream.tell() | |
zeroed = stream.seek(0) | |
results = {rtype: {"count": 0, "indices": [], "data_size": [], "checksum_flag": []} for rtype in RecordTypes} | |
idx = 0 | |
while stream.tell() < fsize: | |
data_size, record_id, flag = record_info(stream) | |
results[RecordTypes(record_id)]["count"] += 1 | |
results[RecordTypes(record_id)]["indices"].append(stream.tell()) | |
results[RecordTypes(record_id)]["data_size"].append(data_size) | |
results[RecordTypes(record_id)]["checksum_flag"].append(flag) | |
_ = numpy.fromfile(stream, f"S{data_size}", count=1) | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
idx = stream.tell() | |
stream.seek(current_pos) | |
for record_type in results: | |
results[record_type]["indices"] = numpy.array(results[record_type]["indices"]) | |
return results | |
def record_info(stream): | |
data_size = numpy.fromfile(stream, ">u4", count=1)[0] | |
record_identifier = numpy.fromfile(stream, ">i4", count=1)[0] | |
checksum_flag = bool(record_identifier & CHECKSUM_BIT) | |
return data_size, record_identifier, checksum_flag | |
def read_header(stream, data_size, checksum_flag): | |
if checksum_flag: | |
checksum = numpy.fromfile(stream, ">i4", count=1)[0] | |
data = numpy.fromfile(stream, f"S{data_size}", count=1)[0] | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def processing_parameters(stream, data_size, checksum_flag): | |
if checksum_flag: | |
checksum = numpy.fromfile(stream, ">i4", count=1)[0] | |
params = dict() | |
params["TIME"] = numpy.fromfile(stream, ">i2", count=4) | |
params["NUM_PARAMS"] = numpy.fromfile(stream, ">i2", count=1)[0] | |
for i in range(params["NUM_PARAMS"]): | |
param_size = numpy.fromfile(stream, ">i2", count=1)[0] | |
data = numpy.fromfile(stream, f"S{param_size}", count=1)[0] | |
key, value = data.decode("utf-8").strip().split("=") | |
params[key] = value | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return params | |
def _proc_param_parser(value): | |
"""Convert any strings that have known types such as bools, floats.""" | |
if isinstance(value, datetime): # nothing to do already parsed | |
return value | |
booleans = { | |
"yes": True, | |
"no": False, | |
"true": True, | |
"false": False, | |
} | |
if "," in value: # dealing with an array | |
array = value.split(",") | |
if "." in value: # assumption on period being a decimal point | |
parsed = numpy.array(array, dtype="float").tolist() | |
else: | |
# should be dealing with an array of "UNKNWN" or "UNKNOWN" | |
parsed = ["unknown"]*len(array) | |
elif "." in value: # assumption on period being a decimal point | |
parsed = float(value) | |
elif value.lower() in booleans: | |
parsed = booleans[value.lower()] | |
elif value.lower() in ["unknwn", "unknown"]: | |
parsed = "unknown" | |
else: # most likely an integer or generic string | |
try: | |
parsed = int(value) | |
except ValueError: | |
parsed = value.lower() | |
return parsed | |
def _standardise_proc_param_keys(key): | |
"""Convert to lowercase, replace any spaces with underscore.""" | |
return key.lower().replace(" ", "_") | |
def processing_parameters2(stream, data_size, checksum_flag): | |
params = dict() | |
idx = 0 | |
blob = stream.readline(data_size) | |
if checksum_flag: | |
checksum = numpy.frombuffer(blob, ">i4", count=1)[0] | |
idx += 4 | |
dtype = numpy.dtype( | |
[ | |
("time_seconds", ">i4"), | |
("time_nano_seconds", ">i4"), | |
("num_params", ">i2"), | |
] | |
) | |
data = numpy.frombuffer(blob, dtype, count=1) | |
params["time_seconds"] = int(data["time_seconds"][0]) | |
params["time_nano_seconds"] = int(data["time_nano_seconds"][0]) | |
params["num_params"] = data["num_params"][0] | |
idx += 10 | |
for i in range(params["num_params"]): | |
param_size = numpy.frombuffer(blob[idx:], ">i2", count=1)[0] | |
idx += 2 | |
data = numpy.frombuffer(blob[idx:], f"S{param_size}", count=1)[0] | |
idx += param_size | |
key, value = data.decode("utf-8").strip().split("=") | |
if key == "REFERENCE TIME": | |
value = datetime.strptime(value, "%Y/%j %H:%M:%S") | |
params["processed_datetime"] = value + timedelta( | |
seconds=params["time_seconds"], | |
milliseconds=params["time_nano_seconds"] * 1e-6 | |
) | |
params[_standardise_proc_param_keys(key)] = _proc_param_parser(value) | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return params | |
def attitude(stream, data_size, checksum_flag): | |
if checksum_flag: | |
checksum = numpy.fromfile(stream, ">i4", count=1)[0] | |
base_time = numpy.fromfile(stream, ">i4", count=2) | |
acq_time = datetime.utcfromtimestamp(base_time[0] + NANO_SECONDS_SF * base_time[1]) | |
num_measurements = numpy.fromfile(stream, ">i2", count=1)[0] | |
data = { | |
"attitude_time": [], | |
"pitch": [], | |
"roll": [], | |
"heave": [], | |
"heading": [], | |
} | |
dtype = numpy.dtype( | |
[ | |
("attitude_time", ">i2"), | |
("pitch", ">i2"), | |
("roll", ">i2"), | |
("heave", ">i2"), | |
("heading", ">u2"), | |
] | |
) | |
for i in range(num_measurements): | |
blob = numpy.fromfile(stream, dtype, count=1)[0] | |
# pdb.set_trace() | |
# data["attitude_time"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100) | |
# data["pitch"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100) | |
# data["roll"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100) | |
# data["heave"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100) | |
# data["heading"].append(numpy.fromfile(stream, ">u2", count=1)[0] / 100) | |
data["attitude_time"].append( | |
acq_time + timedelta(seconds=blob["attitude_time"] / 1000) | |
) | |
data["pitch"].append(blob["pitch"] / 100) | |
data["roll"].append(blob["roll"] / 100) | |
data["heave"].append(blob["heave"] / 100) | |
data["heading"].append(blob["heading"] / 100) | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def attitude2(stream, data_size, checksum_flag): | |
# using stream.readline() would stop at the first new line | |
# using stream.readlines() can read more than data_size | |
blob = numpy.fromfile(stream, f"S{data_size}", count=1)[0] | |
idx = 0 | |
if checksum_flag: | |
checksum = numpy.frombuffer(blob, ">i4", count=1)[0] | |
idx += 4 | |
base_time = numpy.frombuffer(blob[idx:], ">i4", count=2) | |
idx += 8 | |
acq_time = datetime.utcfromtimestamp(base_time[0] + NANO_SECONDS_SF * base_time[1]) | |
num_measurements = numpy.frombuffer(blob[idx:], ">i2", count=1)[0] | |
idx += 2 | |
data = { | |
"attitude_time": [], | |
"pitch": [], | |
"roll": [], | |
"heave": [], | |
"heading": [], | |
} | |
dtype = numpy.dtype( | |
[ | |
("attitude_time", ">i2"), | |
("pitch", ">i2"), | |
("roll", ">i2"), | |
("heave", ">i2"), | |
("heading", ">u2"), | |
] | |
) | |
for i in range(num_measurements): | |
numpy_blob = numpy.frombuffer(blob[idx:], dtype, count=1)[0] | |
idx += 10 | |
data["attitude_time"].append( | |
acq_time + timedelta(seconds=numpy_blob["attitude_time"] / 1000) | |
) | |
data["pitch"].append(numpy_blob["pitch"] / 100) | |
data["roll"].append(numpy_blob["roll"] / 100) | |
data["heave"].append(numpy_blob["heave"] / 100) | |
data["heading"].append(numpy_blob["heading"] / 100) | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def read_svp(stream, data_size, flag): | |
dtype = numpy.dtype( | |
[ | |
("obs_seconds", ">u4"), | |
("obs_nano", ">u4"), | |
("app_seconds", ">u4"), | |
("app_nano", ">u4"), | |
("lon", ">i4"), | |
("lat", ">i4"), | |
("num_points", ">u4"), | |
] | |
) | |
# obs_time = numpy.fromfile(stream, ">u4", count=2) | |
# app_time = numpy.fromfile(stream, ">u4", count=2) | |
# time = numpy.fromfile(stream, ">u4", count=4) | |
# lon, lat = numpy.fromfile(stream, ">i4", count=2) | |
# num_points = numpy.fromfile(stream, ">u4", count=1)[0] | |
blob = numpy.fromfile(stream, dtype, count=1) | |
# pdb.set_trace() | |
svp = numpy.fromfile(stream, ">u4", count=2 * blob["num_points"][0]) / 100 | |
data = { | |
"obs_time2": datetime.utcfromtimestamp( | |
blob["obs_seconds"][0] + NANO_SECONDS_SF * blob["obs_nano"][0] | |
), | |
"app_time2": datetime.utcfromtimestamp( | |
blob["app_seconds"][0] + NANO_SECONDS_SF * blob["app_nano"][0] | |
), | |
"lon": blob["lon"][0], | |
"lat": blob["lat"][0], | |
"num_points": blob["num_points"][0], | |
"svp": svp.reshape((blob["num_points"][0], 2)), | |
} | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def swath_bathymetry_summary(stream, data_size, flag): | |
dtype = numpy.dtype( | |
[ | |
("time_first_ping_seconds", ">i4"), | |
("time_first_ping_nano_seconds", ">i4"), | |
("time_last_ping_seconds", ">i4"), | |
("time_last_ping_nano_seconds", ">i4"), | |
("min_latitude", ">i4"), | |
("min_longitude", ">i4"), | |
("max_latitude", ">i4"), | |
("max_longitude", ">i4"), | |
("min_depth", ">i4"), | |
("max_depth", ">i4"), | |
] | |
) | |
blob = numpy.fromfile(stream, dtype, count=1) | |
data = { | |
"time_first_ping": datetime.utcfromtimestamp( | |
blob["time_first_ping_seconds"][0] + NANO_SECONDS_SF * blob["time_first_ping_nano_seconds"][0] | |
), | |
"time_last_ping": datetime.utcfromtimestamp( | |
blob["time_last_ping_seconds"][0] + NANO_SECONDS_SF * blob["time_last_ping_nano_seconds"][0] | |
), | |
"min_latitude": blob["min_latitude"][0] / 10_000_000, | |
"min_longitude": blob["min_longitude"][0] / 10_000_000, | |
"max_latitude": blob["max_latitude"][0] / 10_000_000, | |
"max_longitude": blob["max_longitude"][0] / 10_000_000, | |
"min_depth": blob["min_depth"][0] / 100, | |
"max_depth": blob["max_depth"][0] / 100, | |
} | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def comment(stream, data_size, flag): | |
dtype = numpy.dtype( | |
[ | |
("time_comment_seconds", ">i4"), | |
("time_comment_nano_seconds", ">i4"), | |
("comment_length", ">i4"), | |
] | |
) | |
blob = numpy.fromfile(stream, dtype, count=1) | |
# pdb.set_trace() | |
length = blob["comment_length"][0] # + 1 | |
data = { | |
"time_comment_made": datetime.utcfromtimestamp( | |
blob["time_comment_seconds"][0] + NANO_SECONDS_SF * blob["time_comment_nano_seconds"][0] | |
), | |
"length": length, | |
"comment": numpy.fromfile(stream, f"S{length}", count=1)[0], | |
} | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def comment2(stream, data_size, flag): | |
dtype = numpy.dtype( | |
[ | |
("time_comment_seconds", ">i4"), | |
("time_comment_nano_seconds", ">i4"), | |
("comment_length", ">i4"), | |
] | |
) | |
blob = stream.readline(data_size) | |
decoded = numpy.frombuffer(blob, dtype, count=1) | |
length = decoded["comment_length"][0] | |
data = { | |
"time_comment_made": datetime.utcfromtimestamp( | |
decoded["time_comment_seconds"][0] + NANO_SECONDS_SF * decoded["time_comment_nano_seconds"][0] | |
), | |
"length": length, | |
"comment": blob[12:].decode().strip().rstrip("\x00"), | |
} | |
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
return data | |
def bathymetry_ping(stream, data_size, flag): | |
dtype = numpy.dtype( | |
[ | |
("time_ping_seconds", ">i4"), | |
("time_ping_nano_seconds", ">i4"), | |
("longitude", ">i4"), | |
("latitude", ">i4"), | |
("number_beams", ">i2"), | |
("centre_beam", ">i2"), | |
("ping_flags", ">i2"), | |
("reserved", ">i2"), | |
("tide_corrector", ">i2"), | |
("depth_corrector", ">i4"), | |
("heading", ">i2"), | |
("pitch", ">i2"), | |
("roll", ">i2"), | |
("heave", ">i2"), | |
("course", ">i2"), | |
("speed", ">i2"), | |
("height", ">i4"), | |
("separation", ">i4"), | |
("gps_tide_corrector", ">i4"), | |
] | |
) | |
blob = numpy.fromfile(stream, dtype, count=1) | |
_ = numpy.fromfile(stream, "B", count=2) # spare space | |
# this isn't the end of this record. there are sub-records to decode | |
subrecord_hdr = numpy.fromfile(stream, ">i4", count=1) | |
subrecord_id = (subrecord_hdr & 0xFF000000) >> 24 | |
subrecord_size = subrecord_hdr & 0x00FFFFFF | |
# reading a scale factor record | |
num_factors = numpy.fromfile(stream, ">i4", count=1)[0] | |
sfs = [] | |
for i in range(num_factors): | |
sfs.append(numpy.fromfile(stream, ">i4", count=3)) | |
subrecords = [] # shouldn't be any duplicate id's | |
for i in range(num_factors): | |
subhdr = numpy.fromfile(stream, ">i4", count=1)[0] | |
subid = (subhdr & 0xFF000000) >> 24 | |
subsz = subhdr & 0x00FFFFFF | |
size = subsz // blob["number_beams"][0] | |
dtype = f">i{size}" | |
if dtype == ">i0": | |
return blob, sfs, subrecords | |
pdb.set_trace() | |
subrecords.append((subid, numpy.fromfile(stream, dtype, count=blob["number_beams"][0]))) | |
# current test this is the depth subrecord | |
# subhdr = numpy.fromfile(stream, ">i4", count=1) | |
# sid = (subhdr & 0xFF000000) >> 24 | |
# ssz = subhdr & 0x00FFFFFF | |
# dsize = ssz // blob["number_beams"] | |
# dtype = f">i{dsize}" | |
# depth = numpy.fromfile(stream, dtype, count=blob["number_beams"]) | |
return blob, sfs, subrecords | |
def run(fname): | |
stream = open(fname, "rb") | |
data_size, record_id, flag = record_info(stream) | |
header = read_header(stream, data_size, flag) | |
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
data_size, record_id, flag = record_info(stream) | |
params = processing_parameters(stream, data_size, flag) | |
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
# pdb.set_trace() | |
att = [] | |
data_size, record_id, flag = record_info(stream) | |
while record_id == 12: | |
att.append(attitude(stream, data_size, flag)) | |
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4) | |
data_size, record_id, flag = record_info(stream) | |
svp = read_svp(stream, data_size, flag) | |
pdb.set_trace() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
from scipy import ndimage | |
import numexpr | |
from numba import jit | |
import structlog | |
_LOG = structlog.get_logger() | |
def laplacian_operator(lap: numpy.ndarray, flag_grid: numpy.ndarray, threshold: float): | |
""" | |
Laplacian operator check. | |
Notes: | |
The flag_grid is modified inplace which is fine, | |
unless <ndarray>.flags.writeable is False | |
Locations could be returned, or we return None and write results into the | |
flag_grid. | |
The Cython code also logs the location of each pixel flagged by the check | |
numexpression is used to reduce the memory footprint from the temp arrays | |
that get created. We get a speed benefit too. | |
""" | |
# (sixy6e) wierd logic if threshold is positive eg: | |
# lap = [[-9, -5, -4], | |
# [ 9, 5, 4], | |
# [-1, 0, 1]] | |
# th = 5 | |
# would result in the entire array being true. | |
# Is this the intended behaviour??? | |
locations = numexpr.evaluate("(lap < threshold) | (lap > -threshold)") | |
flag_grid[locations] = 1 # check number 1 | |
# log the locations | |
# if really desired, this could be done differently, | |
# even though the locations are written as GeoPoints later on ... | |
for row, col in zip(*numpy.where(locations)): # numpy.where is slow but fits the need | |
_LOG.info("laplacian operator check (#1)", row=row, col=col) | |
def gaussian_curvature( | |
gauss_curve: numpy.ndarray, flag_grid: numpy.ndarray, threshold: float | |
): | |
""" | |
Gaussian curvature check. | |
Notes: | |
Similar notes in regards to modification in-place as the laplacian operator | |
check. | |
The operation could be done in 1 line, but the original code logs the locations | |
of the flags. | |
""" | |
locations = gauss_curve > threshold | |
flag_grid[locations] = 2 # check number 2 | |
for row, col in zip(*numpy.where(locations)): | |
_LOG.info("gaussian curvature check (#2)", row=row, col=col) | |
@jit(nopython=True, cache=True) | |
def adjacent_cells( | |
bathy: numpy.ndarray, | |
flag_grid: numpy.ndarray, | |
threshold: float, | |
percent_1: float, | |
percent_2: float, | |
): | |
"""""" | |
rows, cols = bathy.shape | |
# the grid is traversed row by row | |
for row in range(rows): # we get the row | |
# Historically, we were skipping the first and the last row | |
# if (row == 0) or (row == rows - 1): | |
# continue | |
for col in range(cols): # we get the column | |
# (sixy6e) why not simply loop over [1, n-1]??? | |
if (col == 0) or (col == cols - 1): | |
continue | |
if flag_grid[row, col] != 0: # avoid existing flagged nodes | |
continue | |
# for each node in the grid, the depth is retrieved | |
depth_node = bathy[row, col] | |
# any further calculation is skipped in case of a no-data value | |
if numpy.isnan(depth_node): | |
continue | |
neighbour_count = 0 # initialize the number of neighbors | |
diff_pos_count = ( | |
0 # initialize the number of neighbors with positive depth diff | |
) | |
diff_neg_count = ( | |
0 # initialize the number of neighbors with negative depth diff | |
) | |
# ----------- left node ----------- | |
if col > 0: # if we are not on the first column | |
# attempt to retrieve depth | |
if flag_grid[row, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 1] | |
if numpy.isnan(depth_neighbour) and col > 1: | |
if flag_grid[row, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 2] | |
if numpy.isnan(depth_neighbour) and col > 2: | |
if flag_grid[row, col - 3] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- right node ----------- | |
if col < cols - 1: # if we are not on the last column | |
# attempt to retrieve depth | |
if flag_grid[row, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 1] | |
if numpy.isnan(depth_neighbour) and (col < cols - 2): | |
if flag_grid[row, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 2] | |
if numpy.isnan(depth_neighbour) and (col < cols - 3): | |
if flag_grid[row, col + 3] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- bottom node ----------- | |
if row > 0: # if we are not on the first row | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col] | |
if numpy.isnan(depth_neighbour) and row > 1: | |
if flag_grid[row - 2, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col] | |
if numpy.isnan(depth_neighbour) and row > 2: | |
if flag_grid[row - 3, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 3, col] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- top node ----------- | |
if row < rows - 1: # if we are not on the last row | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col] | |
if numpy.isnan(depth_neighbour) and (row < rows - 2): | |
if flag_grid[row + 2, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col] | |
if numpy.isnan(depth_neighbour) and (row < rows - 3): | |
if flag_grid[row + 3, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 3, col] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- bottom-left node ----------- | |
if (row > 0) and (col > 0): # if we are not on the first row and col | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col - 1] | |
if numpy.isnan(depth_neighbour) and row > 1 and col > 1: | |
if flag_grid[row - 2, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col - 2] | |
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2: | |
# if flag_grid[row - 3, col - 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row - 3, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- top-right node ----------- | |
if (row < rows - 1) and ( | |
col < cols - 1 | |
): # if we are not on the last row and col | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col + 1] | |
if numpy.isnan(depth_neighbour) and (row < rows - 2) and (col < cols - 2): | |
if flag_grid[row + 2, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col + 2] | |
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and (col < cols - 3): | |
# if flag_grid[row + 3, col + 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row + 3, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- bottom-right node ----------- | |
if (row > 0) and ( | |
col < cols - 1 | |
): # if we are not on the first row and last col | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col + 1] | |
if numpy.isnan(depth_neighbour) and row > 1 and (col < cols - 2): | |
if flag_grid[row - 2, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col + 2] | |
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2: | |
# if flag_grid[row - 3, col + 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row - 3, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
# ----------- top-left node ----------- | |
if (row < rows - 1) and ( | |
col > 0 | |
): # if we are not on the last row and first col | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col - 1] | |
if numpy.isnan(depth_neighbour) and (row < rows - 2) and col > 1: | |
if flag_grid[r + 2, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col - 2] | |
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and col > 2: | |
# if flag_grid[row + 3, col - 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row + 3, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_node - depth_neighbour > threshold: | |
diff_pos_count += 1 | |
if depth_node - depth_neighbour < -threshold: | |
diff_neg_count += 1 | |
if neighbour_count == 0: | |
continue | |
# (sixy6e) this section prohibts from simply looping over [1, n-1] | |
# calculate the ratio among flagged and total neighbors, then use it to | |
# decide if a flier | |
if (row == 0) or (col == 0) or (row == (rows - 1)) or (col == (cols - 1)): | |
thr = 1.0 | |
elif neighbour_count <= 4: | |
thr = percent_1 | |
else: | |
thr = percent_2 | |
pos_ratio = diff_pos_count / neighbour_count | |
if pos_ratio >= thr: | |
flag_grid[row, col] = 3 # check #3 | |
_LOG.info( | |
"adjacency check #3", | |
row=row, | |
col=col, | |
diff_pos_count=diff_pos_count, | |
neighbour_count=neighbour_count, | |
pos_ratio=pos_ratio, | |
thr=thr, | |
) | |
continue | |
neg_ratio = diff_neg_count / neighbour_count | |
if neg_ratio >= thr: | |
flag_grid[row, col] = 3 # check #3 | |
_LOG.info( | |
"adjacency check #3", | |
row=row, | |
col=col, | |
diff_neg_count=diff_neg_count, | |
neighbour_count=neighbour_count, | |
neg_ratio=neg_ratio, | |
thr=thr, | |
) | |
continue | |
@jit(nopython=True, cache=True) | |
def noisy_edges(bathy: numpy.ndarray, flag_grid: numpy.ndarray, dist: int, cf: float): | |
"""""" | |
rows, cols = bathy.shape | |
# the grid is traversed row by row | |
for row in range(rows): # we get the row | |
# (sixy6e) why not simply loop over [1, n-1]??? | |
if (row == 0) or (row == rows - 1): | |
continue | |
for col in range(cols): # we get the column | |
# (sixy6e) why not simply loop over [1, n-1]??? | |
if (col == 0) or (col == cols - 1): | |
continue | |
if flag_grid[row, col] != 0: # avoid existing flagged nodes | |
continue | |
# for each node in the grid, the depth is retrieved | |
depth_node = bathy[row, col] | |
# any further calculation is skipped in case of a no-data value | |
if numpy.isnan(depth_node): | |
continue | |
neighbour_count = 0 | |
neighbour_diff = 0.0 | |
min_depth = -9999.9 | |
max_diff = 0.0 | |
# ----------- left node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 1] | |
if numpy.isnan(depth_neighbour) and col > 1: | |
if flag_grid[row, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 2] | |
if numpy.isnan(depth_neighbour) and col > 2 and dist > 2: | |
if flag_grid[row, col - 3] != 0: | |
continue | |
depth_neighbour = bathy[row, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- right node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 1] | |
if numpy.isnan(depth_neighbour) and (col < cols - 2): | |
if flag_grid[row, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 2] | |
if numpy.isnan(depth_neighbour) and (col < cols - 3) and dist > 2: | |
if flag_grid[row, col + 3] != 0: | |
continue | |
depth_neighbour = bathy[row, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- bottom node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col] | |
if numpy.isnan(depth_neighbour) and r > 1: | |
if flag_grid[row - 2, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col] | |
if numpy.isnan(depth_neighbour) and row > 2 and dist > 2: | |
if flag_grid[row - 3, col] != 0: | |
continue | |
depth_neighbour = bathy[row - 3, col] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- top node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col] | |
if numpy.isnan(depth_neighbour) and (row < rows - 2): | |
if flag_grid[row + 2, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col] | |
if numpy.isnan(depth_neighbour) and (row < rows - 3) and dist > 2: | |
if flag_grid[row + 3, col] != 0: | |
continue | |
depth_neighbour = bathy[row + 3, col] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- bottom-left node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col - 1] | |
if numpy.isnan(depth_neighbour) and row > 1 and col > 1 and dist > 2: | |
if flag_grid[row - 2, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col - 2] | |
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2: | |
# if flag_grid[row - 3, col - 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row - 3, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- top-right node ----------- | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col + 1] | |
if ( | |
numpy.isnan(depth_neighbour) | |
and (row < rows - 2) | |
and (col < cols - 2) | |
and dist > 2 | |
): | |
if flag_grid[row + 2, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col + 2] | |
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and (col < cols - 3): | |
# if flag_grid[row + 3, col + 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row + 3, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- bottom-right node ------------ | |
# attempt to retrieve depth | |
if flag_grid[row - 1, col + 1] != 0: | |
continue | |
depth_neighbour = bathy[row - 1, col + 1] | |
if numpy.isnan(depth_neighbour) and row > 1 and (col < cols - 2) and dist > 2: | |
if flag_grid[row - 2, col + 2] != 0: | |
continue | |
depth_neighbour = bathy[row - 2, col + 2] | |
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2: | |
# if flag_grid[row - 3, col + 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row - 3, col + 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
# ----------- top-left node----------- | |
# attempt to retrieve depth | |
if flag_grid[row + 1, col - 1] != 0: | |
continue | |
depth_neighbour = bathy[row + 1, col - 1] | |
if numpy.isnan(depth_neighbour) and (row < rows - 2) and col > 1 and dist > 2: | |
if flag_grid[row + 2, col - 2] != 0: | |
continue | |
depth_neighbour = bathy[row + 2, col - 2] | |
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and col > 2: | |
# if flag_grid[row + 3, col - 3] != 0: | |
# continue | |
# depth_neighbour = bathy[row + 3, col - 3] | |
# evaluate depth difference | |
if not numpy.isnan(depth_neighbour): | |
neighbour_count += 1 | |
if depth_neighbour > min_depth: | |
min_depth = depth_neighbour | |
neighbour_diff = abs(depth_node - depth_neighbour) | |
if neighbour_diff > max_diff: | |
max_diff = neighbour_diff | |
if neighbour_count == 0: | |
continue | |
if neighbour_count > 6: | |
continue | |
if min_depth >= -100.0: | |
threshold = (0.25 + (0.013 * -min_depth) ** 2) ** 0.5 | |
else: | |
threshold = (1.0 + (0.023 * -min_depth) ** 2) ** 0.5 | |
if max_diff > cf * threshold: | |
flag_grid[row, col] = 6 # check #6 | |
_LOG.info( | |
"noisy neighbour (check #6)", | |
row=row, | |
col=col, | |
neighbour_count=neighbour_count, | |
max_diff=max_diff, | |
min_depth=min_depth, | |
threshold=threshold, | |
) | |
@jit(nopython=True, cache=True) | |
def _small_groups( | |
img_labels: numpy.ndarray, | |
bathy: numpy.ndarray, | |
flag_grid: numpy.ndarray, | |
th: float, | |
area_limit: float, | |
check_slivers: bool, | |
check_isolated: bool, | |
sizes: numpy.ndarray, | |
): | |
"""""" | |
rows, cols = img_labels.shape | |
for i in range(sizes.shape[0]): | |
# check only small groups | |
if sizes[i] > area_limit: | |
continue | |
i += 1 | |
conn_count = 0 | |
find = False | |
for row in range(4, rows - 4): # skip bbox boundaries | |
for col in range(4, cols - 4): # skip bbox boundaries | |
# skip if the cell does not belong to the current small group | |
if img_labels[row, col] != i: | |
continue | |
last_row, last_col = row, col | |
nb_rs = [] | |
nb_cs = [] | |
# check for a valid connection to a grid body | |
nb_rs.append(row + 1) # n1 | |
nb_rs.append(row - 1) # n2 | |
nb_rs.append(row - 1) # n3 | |
nb_rs.append(row + 1) # n4 | |
nb_cs.append(col + 1) # n1 | |
nb_cs.append(col + 1) # n2 | |
nb_cs.append(col - 1) # n3 | |
nb_cs.append(col - 1) # n4 | |
nb_rs.append(row + 2) # n5 | |
nb_rs.append(row + 2) # n6 | |
nb_rs.append(row + 0) # n7 | |
nb_rs.append(row - 2) # n8 | |
nb_rs.append(row - 2) # n9 | |
nb_rs.append(row - 2) # n10 | |
nb_rs.append(row + 0) # n11 | |
nb_rs.append(row + 2) # n12 | |
nb_cs.append(col + 0) # n5 | |
nb_cs.append(col + 2) # n6 | |
nb_cs.append(col + 2) # n7 | |
nb_cs.append(col + 2) # n8 | |
nb_cs.append(col + 0) # n9 | |
nb_cs.append(col - 2) # n10 | |
nb_cs.append(col - 2) # n11 | |
nb_cs.append(col - 2) # n12 | |
nb_rs.append(row + 3) # n13 | |
nb_rs.append(row + 3) # n14 | |
nb_rs.append(row + 0) # n15 | |
nb_rs.append(row - 3) # n16 | |
nb_rs.append(row - 3) # n17 | |
nb_rs.append(row - 3) # n18 | |
nb_rs.append(row + 0) # n19 | |
nb_rs.append(row + 3) # n20 | |
nb_cs.append(col + 0) # n13 | |
nb_cs.append(col + 3) # n14 | |
nb_cs.append(col + 3) # n15 | |
nb_cs.append(col + 3) # n16 | |
nb_cs.append(col + 0) # n17 | |
nb_cs.append(col - 3) # n18 | |
nb_cs.append(col - 3) # n19 | |
nb_cs.append(col - 3) # n20 | |
nb_rs.append(row + 4) # n21 | |
nb_rs.append(row + 4) # n22 | |
nb_rs.append(row + 0) # n23 | |
nb_rs.append(row - 4) # n24 | |
nb_rs.append(row - 4) # n25 | |
nb_rs.append(row - 4) # n26 | |
nb_rs.append(row + 0) # n27 | |
nb_rs.append(row + 4) # n28 | |
nb_cs.append(col + 0) # n21 | |
nb_cs.append(col + 4) # n22 | |
nb_cs.append(col + 4) # n23 | |
nb_cs.append(col + 4) # n24 | |
nb_cs.append(col + 0) # n25 | |
nb_cs.append(col - 4) # n26 | |
nb_cs.append(col - 4) # n27 | |
nb_cs.append(col - 4) # n28 | |
nbs_sz = len(nb_rs) | |
for ni in range(nbs_sz): | |
nl = img_labels[nb_rs[ni], nb_cs[ni]] | |
if (nl != 0) and (nl != i) and (sizes[nl - 1] > area_limit): | |
conn_count += 1 | |
find = True | |
if ( | |
abs(bathy[row, col] - bathy[nb_rs[ni], nb_cs[ni]]) > th | |
) and check_slivers: | |
flag_grid[row, col] = 4 # check #4 | |
_LOG.info("check (#4)", ni=ni + 1, row=row, col=col) | |
break | |
if find: | |
break | |
if find: | |
break | |
# it is an isolated group | |
if ( | |
(last_row > 4) | |
and (last_row < rows - 4) | |
and (last_col > 4) | |
and (last_col < cols - 4) | |
): | |
if (conn_count == 0) and check_isolated: | |
flag_grid[last_row, last_col] = 5 # check #5 | |
_LOG.info("isolated group (#5)", last_row=last_row, last_col=last_col) | |
def small_groups( | |
grid_bin: numpy.ndarray, | |
bathy: numpy.ndarray, | |
flag_grid: numpy.ndarray, | |
th: float, | |
area_limit: float, | |
check_slivers: bool, | |
check_isolated: bool, | |
): | |
"""""" | |
rows, cols = grid_bin.shape | |
img_labels, n_labels = ndimage.label(grid_bin) | |
sizes = ndimage.sum(grid_bin, img_labels, range(1, n_labels + 1)) | |
_small_groups( | |
img_labels, bathy, flag_grid, th, area_limit, check_slivers, check_isolated, sizes | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import numexpr | |
import structlog | |
_LOG = structlog.get_logger() | |
def calc_tvu_qc( | |
bathy: numpy.ndarray, | |
product_uncertainty: numpy.ndarray, | |
pu_nodata: float, | |
tvu_qc: numpy.ndarray, | |
): | |
""" | |
IHO S-44 TVU QC Calculations | |
Total vertical uncertainty. (TODO; confirm that is the correct acronym) | |
""" | |
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata | |
tvu_qc[idx] = numpy.nan | |
idx = bathy <= 100.0 | |
# subsetted arrays | |
bathy_subs = bathy[idx] # pylint: disable=unused-variable | |
pr_unc_subs = product_uncertainty[idx] # pylint: disable=unused-variable | |
# using numexpr to reduce the temp arrays that are required for this long expression | |
expression = "pr_unc_subs / ((0.25 + (0.013 * bathy_subs) ** 2) ** 0.5)" | |
tvu_qc[idx] = numexpr.evaluate(expression) | |
# inverse, i.e. > 100.0 | |
idx = ~idx | |
bathy_subs = bathy[idx] # noqa F841 | |
pr_unc_subs = product_uncertainty[idx] # noqa F841 | |
expression = "pr_unc_subs / ((1.0 + (0.23 * bathy_subs) ** 2) ** 0.5)" | |
tvu_qc[idx] = numexpr.evaluate(expression) | |
def calc_tvu_qc_a1( | |
bathy: numpy.ndarray, | |
product_uncertainty: numpy.ndarray, | |
pu_nodata: float, | |
tvu_qc: numpy.ndarray, | |
): | |
""" | |
CATZOC A1 TVU Calculations | |
""" | |
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata | |
tvu_qc[idx] = numpy.nan | |
# inverse, and subsetted arrays | |
idx = ~idx | |
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable | |
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable | |
# again using numexpr as these calcs will be float64 | |
expression = "pr_unc_subs / (0.5 + (0.01 * bathy_subs))" | |
tvu_qc[idx] = numexpr.evaluate(expression) | |
def calc_tvu_qc_a2b( | |
bathy: numpy.ndarray, | |
product_uncertainty: numpy.ndarray, | |
pu_nodata: float, | |
tvu_qc: numpy.ndarray, | |
): | |
""" | |
CATZOC A2/B TVU Calculations | |
""" | |
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata | |
tvu_qc[idx] = numpy.nan | |
# inverse, and subsetted arrays | |
idx = ~idx | |
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable | |
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable | |
# again using numexpr as these calcs will be float64 | |
expression = "pr_unc_subs / (1.0 + (0.02 * bathy_subs))" | |
tvu_qc[idx] = numexpr.evaluate(expression) | |
def calc_tvu_qc_c( | |
bathy: numpy.ndarray, | |
product_uncertainty: numpy.ndarray, | |
pu_nodata: float, | |
tvu_qc: numpy.ndarray, | |
): | |
""" | |
CATZOC C TVU Calculations | |
""" | |
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata | |
tvu_qc[idx] = numpy.nan | |
# inverse, and subsetted arrays | |
idx = ~idx | |
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable | |
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable | |
# again using numexpr as these calcs will be float64 | |
expression = "pr_unc_subs / (2.0 + (0.05 * bathy_subs))" | |
tvu_qc[idx] = numexpr.evaluate(expression) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment