Skip to content

Instantly share code, notes, and snippets.

@lukecampbell
Last active February 7, 2017 13:52
Show Gist options
  • Save lukecampbell/53c32bc4fb54a8daf1d98bf26c9a8305 to your computer and use it in GitHub Desktop.
Save lukecampbell/53c32bc4fb54a8daf1d98bf26c9a8305 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#-*- coding: utf-8 -*-
'''
cbibs.adaptors
~~~~~~~~~~~~~~~~~
This module provides the interface between jobs (and the job context). Each job
uses one or more adaptors to perform calculations or generate netcdf files.
..
:copyright: (C) 2015 RPS ASA
:license: MIT, see LICENSE.txt for more details.
'''
from cbibs import logger
from cbibs.utils import get_config_matrix
from cbibs.datasets import NCDataset
from cbibs.data import get_qc_tests, get_qc_flags, get_observations, set_primary_qc_flags, set_duplicates
from ioos_qartod.qc_tests import qc
from netCDF4 import Dataset
import tempfile
import numpy as np
import os
def no_nan(input_dict):
'''
Takes a dict and converts any NaN values to None, and then returns a new
dictionary.
:param dict input_dict: The dictionary to convert
'''
return {k: v if not (isinstance(v, float) and np.isnan(v)) else None
for k, v in input_dict.iteritems()}
def gap_test_adaptor(data, site_code, variable_name):
'''
Flags NULL or NaN values as missing
:param pandas.DataFrame data: A data frame returned from get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
mask = data['obs_value'].isnull()
res = mask.astype('int') # 1 = bad, 0 good
res = res * 3 + 1
test_parameters = {
}
return res.values, no_nan(test_parameters)
def gross_range_adaptor(data, site_code, variable_name):
'''
Reads the gross range configuration from config.MATRIX_URL and applies the
gross range test to the data provided.
:param pandas.DataFrame data: A data frame returned from get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
matrix = get_config_matrix('Gross Range')
matrix['site_code'] = matrix['site_code'].astype(str)
matrix['variable_name'] = matrix['variable_name'].astype(str)
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)]
if len(df) == 0:
return None, None
sensor_min = df['sensor_min'].iloc[0].item()
sensor_max = df['sensor_max'].iloc[0].item()
user_min = df['user_min'].iloc[0].item()
user_max = df['user_max'].iloc[0].item()
tbounds = data.index.min(), data.index.max()
sensor_span = (sensor_min, sensor_max)
user_span = (user_min, user_max)
res = qc.range_check(data['obs_value'].values, sensor_span, user_span)
test_parameters = {
'sensor_min' : sensor_min,
'sensor_max' : sensor_max,
'user_min' : user_min,
'user_max' : user_max
}
return res, no_nan(test_parameters)
def location_test_adaptor(data, site_code):
'''
Reads the location test configuration from config.MATRIX_URL and applies
the location test to the data provided.
:param pandas.DataFrame data: A data frame constructed from locations returned by get_locations
:param str site_code: The station site code (e.g. 44041)
Example of usage::
locations = get_locations(conn, site_code, start_time, end_time)
locations = pd.DataFrame.from_records(locations, columns=locations[0]._fields)
flags, test_parameters = location_test_adaptor(locations, site_code)
'''
matrix = get_config_matrix('Location Test')
# Sometimes pandas will parse the columns as ints, sometimes as strings, so
# we try our luck with both
matrix['site_code'] = matrix['site_code'].astype(str)
df = matrix[(matrix['site_code'] == site_code)]
if len(df) < 1:
raise ValueError('No configuration available for site %s' % site_code)
ll_lat = df['ll_lat'].iloc[0]
ll_lon = df['ll_lon'].iloc[0]
ur_lat = df['ur_lat'].iloc[0]
ur_lon = df['ur_lon'].iloc[0]
range_max = df['range_max'].iloc[0]
bbox = [
[ll_lon, ll_lat],
[ur_lon, ur_lat]
]
if np.isnan(range_max):
range_max=None
# WARNING!!: if range_max is defined, it will not function properly as the
# elevation and variable groupings aren't taken into account here
flags = qc.location_set_check(data['longitude'].values, data['latitude'].values, bbox, range_max)
test_parameters = {
'bbox' : bbox
}
return flags, no_nan(test_parameters)
def spike_test_adaptor(data, site_code, variable_name):
'''
Adaptor method for getting configuration data and performing qc tests for a
site and variable with the provided data frame.
:param pandas.DataFrame data: A data frame returned by get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
matrix = get_config_matrix('Spike')
matrix['site_code'] = matrix['site_code'].astype(str)
matrix['variable_name'] = matrix['variable_name'].astype(str)
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)]
if len(df) == 0:
return None, None
low_threshold = df['low_threshold'].iloc[0]
high_threshold = df['high_threshold'].iloc[0]
res = qc.spike_check(data['obs_value'].values, low_threshold, high_threshold, np.array([2]))
test_parameters = {
'low_threshold' : low_threshold,
'high_threshold' : high_threshold
}
return res, no_nan(test_parameters)
def rate_of_change_test_adaptor(data, site_code, variable_name):
'''
Adaptor method for getting configuration data and performing qc tests for a
site and variable with the provided data frame
:param pandas.DataFrame data: A data frame returned by get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
matrix = get_config_matrix('Rate of Change')
matrix['site_code'] = matrix['site_code'].astype(str)
matrix['variable_name'] = matrix['variable_name'].astype(str)
df = matrix[(matrix['site_code'] == site_code) &
(matrix['variable_name'] == variable_name)]
if len(df) == 0:
return None, None
thresh_val = df['thresh_val'].iloc[0]
res = qc.rate_of_change_check(data.index.values,
data['obs_value'].values, thresh_val,
np.array([2]))
test_parameters = {
'thresh_val' : thresh_val
}
return res, no_nan(test_parameters)
def flat_line_check_adaptor(data, site_code, variable_name):
'''
Adaptor method for getting configuration data and performing qc tests for a
site and variable with the provided data frame
:param pandas.DataFrame data: A data frame returned by get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
matrix = get_config_matrix('Flat Line')
matrix['site_code'] = matrix['site_code'].astype(str)
matrix['variable_name'] = matrix['variable_name'].astype(str)
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)]
if len(df) == 0:
return None, None
low_reps = df['low_reps'].iloc[0]
high_reps = df['high_reps'].iloc[0]
epsilon = df['epsilon'].iloc[0]
res = qc.flat_line_check(data['obs_value'].values, low_reps, high_reps, epsilon)
test_parameters = {
'low_reps' : low_reps,
'high_reps' : high_reps,
'epsilon' : epsilon
}
return res, no_nan(test_parameters)
def attenuated_signal_check_adaptor(data, site_code, variable_name):
'''
Adaptor method for getting configuration data and performing qc tests for a
site and variable with the provided data frame.
:param pandas.DataFrame data: A data frame returned by get_observations
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
'''
matrix = get_config_matrix('Attenuated Signal')
matrix['site_code'] = matrix['site_code'].astype(str)
matrix['variable_name'] = matrix['variable_name'].astype(str)
df = matrix[(matrix['site_code'] == site_code) & (matrix['variable_name'] == variable_name)]
if len(df) == 0:
return None, None
min_var_warn = df['min_var_warn'].iloc[0]
min_var_fail = df['min_var_fail'].iloc[0]
check_type = df['check_type'].iloc[0]
res = qc.attenuated_signal_check(data['obs_value'].values, None, min_var_warn, min_var_fail, check_type=check_type)
test_parameters = {
'min_var_warn' : min_var_warn,
'min_var_fail' : min_var_fail,
'check_type' : check_type
}
return res, no_nan(test_parameters)
def primary_qc_adaptor(conn, site_code, variable_name, elevation, start_time, end_time):
'''
Computes the primary QC flags for a station, variable and time range. The
flags are set for each observation in the time interval specified.
Initially all records are set to 2 (NOT_EVALUATED) and are set to 1 (GOOD),
3 (SUSPECT), or 4 (BAD) as they are identified in each of the tests
associated with that observation record.
:param pg2.connection conn: A connection object to the database.
:param str site_code: The station site code (e.g. 44041)
:param str variable_name: Name of the variable as appears in the "actual_name" table.
:param datetime start_time: The start date of the interval.
:param datetime end_time: The end date of the interval.
'''
df = get_observations(conn, site_code, variable_name,
elevation, start_time, end_time)
if len(df) < 1:
logger.warning("No data available to QC for %s - %s (%s m) from %s to %s",
site_code,
variable_name,
elevation,
start_time.isoformat(),
end_time.isoformat())
return
tests = get_qc_tests(conn, site_code, variable_name, start_time, end_time)
for test in tests:
flags = get_qc_flags(conn,
site_code,
variable_name,
test.test_type,
elevation,
start_time,
end_time)
try:
df[test.test_type] = flags.qc_code_id
except ValueError as e:
logger.exception("Failed to set the QC Test")
logger.error("QC Code: %s", flags.qc_code_id)
test_names = [t.test_type for t in tests]
vectors = [df[t].values for t in test_names]
if vectors:
# FIXME: ought to always return int
df['primary_qc'] = qc.qc_compare(vectors).astype(np.int64)
else:
df['primary_qc'] = np.ones_like(df.index.values, dtype=np.int64) * 2
set_primary_qc_flags(conn, site_code, df)
logger.info("Set Primary QC flags for %s - %s from %s to %s", site_code, variable_name, start_time.isoformat(), end_time.isoformat())
def netcdf_adaptor(conn, dataset_id, title, site_code, variable_names, start_time, end_time):
'''
Creates a temporary (mkstemp) netCDF file. The method generates the
complete metadata and file structure. After the structure is created, the
method queries the database for each variable in the sensor group and fills
in the data arrays. The time dimension is the complete union of all
observation records in that sensor group for that time span.
If any records don't exist in individual variables for that time domain,
they are set to the fill_value (-9999).
:param pg2.connection conn: A connection object to the database.
:param str dataset_id: The unique ID for this dataset
:param str title: Title for the dataset
:param str site_code: The station site code (e.g. 44041)
:param list variable_names: The sensor group name as appears in the group_name
column of the cbibs.d_variable_group table
:param datetime start_time: The start date of the interval.
:param datetime end_time: The end date of the interval.
'''
fd, path = tempfile.mkstemp()
os.close(fd)
with Dataset(path, 'w') as nc:
dataset = NCDataset(conn, nc)
dataset.create_structure(dataset_id, title, site_code, variable_names, start_time, end_time)
return path
def duplicates_adaptor(conn, data):
'''
Identifies duplicate records in the data frame using measure_ts,
actual_name, elevation, obs_value. For each duplicate identified a row is
inserted into the temporary table ``t_primary_qc_staging``.
:param pg2.connection conn: A connection object to the database.
:param pandas.DataFrame data: A data frame returned from get_bulk_observations
'''
df = data.reset_index()
mask = df.duplicated(subset=('measure_ts', 'site_code', 'actual_name', 'elevation', 'obs_value'))
df = df[mask]
for index, row in df.iterrows():
set_duplicates(conn, row['id'])
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment