Last active
February 7, 2017 13:52
-
-
Save lukecampbell/53c32bc4fb54a8daf1d98bf26c9a8305 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#-*- coding: utf-8 -*- | |
''' | |
cbibs.adaptors | |
~~~~~~~~~~~~~~~~~ | |
This module provides the interface between jobs (and the job context). Each job | |
uses one or more adaptors to perform calculations or generate netcdf files. | |
.. | |
:copyright: (C) 2015 RPS ASA | |
:license: MIT, see LICENSE.txt for more details. | |
''' | |
from cbibs import logger | |
from cbibs.utils import get_config_matrix | |
from cbibs.datasets import NCDataset | |
from cbibs.data import get_qc_tests, get_qc_flags, get_observations, set_primary_qc_flags, set_duplicates | |
from ioos_qartod.qc_tests import qc | |
from netCDF4 import Dataset | |
import tempfile | |
import numpy as np | |
import os | |
def no_nan(input_dict): | |
''' | |
Takes a dict and converts any NaN values to None, and then returns a new | |
dictionary. | |
:param dict input_dict: The dictionary to convert | |
''' | |
return {k: v if not (isinstance(v, float) and np.isnan(v)) else None | |
for k, v in input_dict.iteritems()} | |
def gap_test_adaptor(data, site_code, variable_name): | |
''' | |
Flags NULL or NaN values as missing | |
:param pandas.DataFrame data: A data frame returned from get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
mask = data['obs_value'].isnull() | |
res = mask.astype('int') # 1 = bad, 0 good | |
res = res * 3 + 1 | |
test_parameters = { | |
} | |
return res.values, no_nan(test_parameters) | |
def gross_range_adaptor(data, site_code, variable_name): | |
''' | |
Reads the gross range configuration from config.MATRIX_URL and applies the | |
gross range test to the data provided. | |
:param pandas.DataFrame data: A data frame returned from get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
matrix = get_config_matrix('Gross Range') | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
matrix['variable_name'] = matrix['variable_name'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)] | |
if len(df) == 0: | |
return None, None | |
sensor_min = df['sensor_min'].iloc[0].item() | |
sensor_max = df['sensor_max'].iloc[0].item() | |
user_min = df['user_min'].iloc[0].item() | |
user_max = df['user_max'].iloc[0].item() | |
tbounds = data.index.min(), data.index.max() | |
sensor_span = (sensor_min, sensor_max) | |
user_span = (user_min, user_max) | |
res = qc.range_check(data['obs_value'].values, sensor_span, user_span) | |
test_parameters = { | |
'sensor_min' : sensor_min, | |
'sensor_max' : sensor_max, | |
'user_min' : user_min, | |
'user_max' : user_max | |
} | |
return res, no_nan(test_parameters) | |
def location_test_adaptor(data, site_code): | |
''' | |
Reads the location test configuration from config.MATRIX_URL and applies | |
the location test to the data provided. | |
:param pandas.DataFrame data: A data frame constructed from locations returned by get_locations | |
:param str site_code: The station site code (e.g. 44041) | |
Example of usage:: | |
locations = get_locations(conn, site_code, start_time, end_time) | |
locations = pd.DataFrame.from_records(locations, columns=locations[0]._fields) | |
flags, test_parameters = location_test_adaptor(locations, site_code) | |
''' | |
matrix = get_config_matrix('Location Test') | |
# Sometimes pandas will parse the columns as ints, sometimes as strings, so | |
# we try our luck with both | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code)] | |
if len(df) < 1: | |
raise ValueError('No configuration available for site %s' % site_code) | |
ll_lat = df['ll_lat'].iloc[0] | |
ll_lon = df['ll_lon'].iloc[0] | |
ur_lat = df['ur_lat'].iloc[0] | |
ur_lon = df['ur_lon'].iloc[0] | |
range_max = df['range_max'].iloc[0] | |
bbox = [ | |
[ll_lon, ll_lat], | |
[ur_lon, ur_lat] | |
] | |
if np.isnan(range_max): | |
range_max=None | |
# WARNING!!: if range_max is defined, it will not function properly as the | |
# elevation and variable groupings aren't taken into account here | |
flags = qc.location_set_check(data['longitude'].values, data['latitude'].values, bbox, range_max) | |
test_parameters = { | |
'bbox' : bbox | |
} | |
return flags, no_nan(test_parameters) | |
def spike_test_adaptor(data, site_code, variable_name): | |
''' | |
Adaptor method for getting configuration data and performing qc tests for a | |
site and variable with the provided data frame. | |
:param pandas.DataFrame data: A data frame returned by get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
matrix = get_config_matrix('Spike') | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
matrix['variable_name'] = matrix['variable_name'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)] | |
if len(df) == 0: | |
return None, None | |
low_threshold = df['low_threshold'].iloc[0] | |
high_threshold = df['high_threshold'].iloc[0] | |
res = qc.spike_check(data['obs_value'].values, low_threshold, high_threshold, np.array([2])) | |
test_parameters = { | |
'low_threshold' : low_threshold, | |
'high_threshold' : high_threshold | |
} | |
return res, no_nan(test_parameters) | |
def rate_of_change_test_adaptor(data, site_code, variable_name): | |
''' | |
Adaptor method for getting configuration data and performing qc tests for a | |
site and variable with the provided data frame | |
:param pandas.DataFrame data: A data frame returned by get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
matrix = get_config_matrix('Rate of Change') | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
matrix['variable_name'] = matrix['variable_name'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code) & | |
(matrix['variable_name'] == variable_name)] | |
if len(df) == 0: | |
return None, None | |
thresh_val = df['thresh_val'].iloc[0] | |
res = qc.rate_of_change_check(data.index.values, | |
data['obs_value'].values, thresh_val, | |
np.array([2])) | |
test_parameters = { | |
'thresh_val' : thresh_val | |
} | |
return res, no_nan(test_parameters) | |
def flat_line_check_adaptor(data, site_code, variable_name): | |
''' | |
Adaptor method for getting configuration data and performing qc tests for a | |
site and variable with the provided data frame | |
:param pandas.DataFrame data: A data frame returned by get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
matrix = get_config_matrix('Flat Line') | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
matrix['variable_name'] = matrix['variable_name'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)] | |
if len(df) == 0: | |
return None, None | |
low_reps = df['low_reps'].iloc[0] | |
high_reps = df['high_reps'].iloc[0] | |
epsilon = df['epsilon'].iloc[0] | |
res = qc.flat_line_check(data['obs_value'].values, low_reps, high_reps, epsilon) | |
test_parameters = { | |
'low_reps' : low_reps, | |
'high_reps' : high_reps, | |
'epsilon' : epsilon | |
} | |
return res, no_nan(test_parameters) | |
def attenuated_signal_check_adaptor(data, site_code, variable_name): | |
''' | |
Adaptor method for getting configuration data and performing qc tests for a | |
site and variable with the provided data frame. | |
:param pandas.DataFrame data: A data frame returned by get_observations | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
''' | |
matrix = get_config_matrix('Attenuated Signal') | |
matrix['site_code'] = matrix['site_code'].astype(str) | |
matrix['variable_name'] = matrix['variable_name'].astype(str) | |
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)] | |
if len(df) == 0: | |
return None, None | |
min_var_warn = df['min_var_warn'].iloc[0] | |
min_var_fail = df['min_var_fail'].iloc[0] | |
check_type = df['check_type'].iloc[0] | |
res = qc.attenuated_signal_check(data['obs_value'].values, None, min_var_warn, min_var_fail, check_type=check_type) | |
test_parameters = { | |
'min_var_warn' : min_var_warn, | |
'min_var_fail' : min_var_fail, | |
'check_type' : check_type | |
} | |
return res, no_nan(test_parameters) | |
def primary_qc_adaptor(conn, site_code, variable_name, elevation, start_time, end_time): | |
''' | |
Computes the primary QC flags for a station, variable and time range. The | |
flags are set for each observation in the time interval specified. | |
Initially all records are set to 2 (NOT_EVALUATED) and are set to 1 (GOOD), | |
3 (SUSPECT), or 4 (BAD) as they are identified in each of the tests | |
associated with that observation record. | |
:param pg2.connection conn: A connection object to the database. | |
:param str site_code: The station site code (e.g. 44041) | |
:param str variable_name: Name of the variable as appears in the "actual_name" table. | |
:param datetime start_time: The start date of the interval. | |
:param datetime end_time: The end date of the interval. | |
''' | |
df = get_observations(conn, site_code, variable_name, | |
elevation, start_time, end_time) | |
if len(df) < 1: | |
logger.warning("No data available to QC for %s - %s (%s m) from %s to %s", | |
site_code, | |
variable_name, | |
elevation, | |
start_time.isoformat(), | |
end_time.isoformat()) | |
return | |
tests = get_qc_tests(conn, site_code, variable_name, start_time, end_time) | |
for test in tests: | |
flags = get_qc_flags(conn, | |
site_code, | |
variable_name, | |
test.test_type, | |
elevation, | |
start_time, | |
end_time) | |
try: | |
df[test.test_type] = flags.qc_code_id | |
except ValueError as e: | |
logger.exception("Failed to set the QC Test") | |
logger.error("QC Code: %s", flags.qc_code_id) | |
test_names = [t.test_type for t in tests] | |
vectors = [df[t].values for t in test_names] | |
if vectors: | |
# FIXME: ought to always return int | |
df['primary_qc'] = qc.qc_compare(vectors).astype(np.int64) | |
else: | |
df['primary_qc'] = np.ones_like(df.index.values, dtype=np.int64) * 2 | |
set_primary_qc_flags(conn, site_code, df) | |
logger.info("Set Primary QC flags for %s - %s from %s to %s", site_code, variable_name, start_time.isoformat(), end_time.isoformat()) | |
def netcdf_adaptor(conn, dataset_id, title, site_code, variable_names, start_time, end_time): | |
''' | |
Creates a temporary (mkstemp) netCDF file. The method generates the | |
complete metadata and file structure. After the structure is created, the | |
method queries the database for each variable in the sensor group and fills | |
in the data arrays. The time dimension is the complete union of all | |
observation records in that sensor group for that time span. | |
If any records don't exist in individual variables for that time domain, | |
they are set to the fill_value (-9999). | |
:param pg2.connection conn: A connection object to the database. | |
:param str dataset_id: The unique ID for this dataset | |
:param str title: Title for the dataset | |
:param str site_code: The station site code (e.g. 44041) | |
:param list variable_names: The sensor group name as appears in the group_name | |
column of the cbibs.d_variable_group table | |
:param datetime start_time: The start date of the interval. | |
:param datetime end_time: The end date of the interval. | |
''' | |
fd, path = tempfile.mkstemp() | |
os.close(fd) | |
with Dataset(path, 'w') as nc: | |
dataset = NCDataset(conn, nc) | |
dataset.create_structure(dataset_id, title, site_code, variable_names, start_time, end_time) | |
return path | |
def duplicates_adaptor(conn, data): | |
''' | |
Identifies duplicate records in the data frame using measure_ts, | |
actual_name, elevation, obs_value. For each duplicate identified a row is | |
inserted into the temporary table ``t_primary_qc_staging``. | |
:param pg2.connection conn: A connection object to the database. | |
:param pandas.DataFrame data: A data frame returned from get_bulk_observations | |
''' | |
df = data.reset_index() | |
mask = df.duplicated(subset=('measure_ts', 'site_code', 'actual_name', 'elevation', 'obs_value')) | |
df = df[mask] | |
for index, row in df.iterrows(): | |
set_duplicates(conn, row['id']) | |
return | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment