Created
April 11, 2012 19:41
-
-
Save phobson/2361814 to your computer and use it in GitHub Desktop.
Env. eng. numpy.ma example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import scipy.stats as stats | |
#import DataAccess as db | |
import pdb | |
def getTestData(): | |
''' | |
generates test data | |
''' | |
raw_data = np.array([ | |
2. , 4.2 , 4.62, 5. , 5. , 5.5 , 5.57, 5.66, | |
5.75, 5.86, 6.65, 6.78, 6.79, 7.5 , 7.5 , 7.5 , | |
8.63, 8.71, 8.99, 9.50, 9.50, 9.85, 10.82, 11.00, | |
11.25, 11.25, 12.2 , 14.92, 16.77, 17.81, 19.16, 19.19, | |
19.64, 20.18, 22.97]) | |
raw_mask = np.array([ | |
0 , 0 , 0 , 1 , 1 , 1 , 0 , 0, | |
1 , 0 , 0 , 0 , 0 , 0 , 0 , 0, | |
0 , 0 , 0 , 1 , 1 , 0 , 0 , 1, | |
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0, | |
0 , 0 , 0]) | |
return np.ma.MaskedArray(data=raw_data, mask=raw_mask) | |
def rosSort(raw_vals, raw_mask): | |
''' | |
prep data for ROS. Sort ascending with non-detects (mask=True) | |
on top. something like this: | |
[2, 4, 4, 10, 3, 5, 6, 10, 12, 40, 78, 120] | |
with [2, 4, 4, 10] being the ND reults (masked the output). | |
data and mask must be in sync for this not to spit out complete garbage | |
''' | |
ndx_d = raw_mask==False | |
ndx_nd = raw_mask==True | |
sorted_data = np.hstack([raw_vals[ndx_nd], raw_vals[ndx_d]]) | |
sorted_mask = np.hstack([raw_mask[ndx_nd], raw_mask[ndx_d]]) | |
ros_data = np.ma.MaskedArray(data=sorted_data, mask=sorted_mask) | |
return ros_data | |
class MR: | |
''' | |
ROS = ranked-order statistics | |
This class implements the MR method outlined Hirsch and Stedinger, 1987) to estimate | |
the censored (non-detect) values of a dataset. | |
Usage: | |
>>> import ros | |
>>> myData = ros.MR(<db_table>, <analyte>, testing=False) | |
Setting the 'testing' parameter to True will provide you with a hard- | |
coded dataset stored in the ros.getData function. Otherwise, the MR class | |
will attempt to connect to the International Stormwater BMP database | |
stored in the present working directory and pull all of the <anaylte>'s | |
data from the <db_table>. | |
Creating an MR object will go ahead and estimate the final data values. Continuing | |
the example above, that data would be accessed via the 'final_data' attribute of the | |
ros.MR object: | |
>>> import ros | |
>>> myData = ros.MR(<db_table>, <analyte>, testing=False) | |
>>> print(myData.final_data) | |
The other public attributes of the ros.MR object are as follows: | |
ros.MR.raw_vals - The raw data pulled from the databse. Detection limits are used | |
with 'U' or 'UJ' flagged data. | |
ros.MR.mask - Array of boolean values indicating if a ros.MR.raw_val is censored | |
ros.MR.data - A numpy MaskedArray object with the censored data masked. | |
ros.MR.DLs - Array of the uniqie detection limits of the dataset. If a | |
non-censored value exists that is lower than the minimum detection | |
limit, it is appended on to this array. | |
ros.MR.DLIndex - Array of indices for each ros.MR.raw_val that points to the | |
corresponding detection limit. | |
ros.MR.nrakks - Array of unaveraged ranks for the data | |
ros.MR.aranks - Array of averaged (final) ranks for the data | |
ros.MR.final_data - Array of data where the censored values have been replaced | |
with estimation. | |
Called ros.MR.plot() will produce a graph comparing the estimated data with raw data | |
containing detection limit substituions. The figure will be saved as | |
<table>_<parameter>.png | |
''' | |
def __init__(self, data, testing=False): | |
assert type(data)==np.ma.core.MaskedArray, 'Data must be a masked array' | |
self.test = testing | |
self.raw_vals = data.data | |
self.raw_mask = data.mask | |
self.data = rosSort(self.raw_vals, self.raw_mask) | |
self.DLs = np.unique(self.data.data[self.data.mask]) | |
if self.DLs.shape[0] > 0: | |
if self.data.min() < self.DLs.min(): | |
self.DLs = np.hstack([self.data.min(), self.DLs]) | |
else: | |
self.DLs = np.array([]) | |
self.DLIndex = self.__rosDLIndex() | |
self.nranks, self.aranks = self.__rosRanks() | |
self.plot_pos, self.final_data, self.Z = self.__rosEstimator() | |
def __rosRanks(self): | |
''' | |
Determine the whacky ranks of the data according to the following logic | |
rank[n] = rank[n-1] + 1 when: | |
n is 0 OR | |
n > 0 and d[n].masked is True and j[n] <> d[n-1] OR | |
n > 0 and d[n].masked is False and d[n-1].masked is True OR | |
n > 0 and d[n].masked is False and d[n-1].masked is False and j[n] <> j[n-1] | |
rank[n] = 1 | |
n > 0 and d[n].masked is True and j[n] == d[n-1] OR | |
n > 0 and d[n].masked is False and d[n-1].masked is False and j[n] == j[n-1] | |
where j[n] is the index of the highest DL that is less than the current data value | |
Then the ranks of non-censored equivalent data values are averaged. | |
''' | |
norm_ranks = np.ones(len(self.data), dtype='f2') | |
for n in range(len(self.data.data)): | |
if n == 0 \ | |
or self.DLIndex[n] <> self.DLIndex[n-1] \ | |
or self.data.mask[n] <> self.data.mask[n-1]: | |
norm_ranks[n] = 1 | |
else: #self.DLIndex[n-1] == self.DLIndex[n]: | |
norm_ranks[n] = norm_ranks[n-1] + 1 | |
avgd_ranks = norm_ranks.copy() | |
for n in range(len(norm_ranks)): | |
if not self.data.mask[n]: | |
index = np.bitwise_and(self.DLIndex==self.DLIndex[n], | |
self.data==self.data[n]) | |
avgd_ranks[index] = norm_ranks[index].mean() | |
return norm_ranks, avgd_ranks | |
def __rosDLIndex(self): | |
''' | |
creates an array of indices for the detection limits (self.DLs) | |
corresponding to each data point | |
''' | |
if self.DLs.shape[0] > 0: | |
j = [] | |
for n in range(len(self.data.data)): | |
value = self.data.data[n] | |
censored = self.data.mask[n] | |
index, = np.where(self.DLs <= value) | |
j.append(index[-1]) | |
j = np.array(j) | |
else: | |
j = np.zeros(len(self.data.data)) | |
return j | |
def __computeABC(self, det_lims): | |
''' | |
Intermediate values needed to compute plotting positions. | |
TODO: get better terminolgy for these | |
''' | |
lowerDL = np.min(det_lims) | |
upperDL = np.max(det_lims) | |
A_above = self.data[self.data >= lowerDL] | |
A_above_and_below = A_above[A_above < upperDL] | |
A_uncensored = A_above_and_below[A_above_and_below.mask==False] | |
A = A_uncensored.shape[0] | |
B_LT = self.data[self.data.data < lowerDL] | |
B_LTE = self.data[self.data.data <= lowerDL] | |
B = B_LTE[B_LTE.mask==True].shape[0] + B_LT[B_LT.mask==False].shape[0] | |
#pdb.set_trace() | |
censored_below = self.data.data[self.data.mask] == lowerDL | |
C = censored_below.sum() | |
return float(A), float(B), float(C) | |
def __rosEstimator(self): | |
''' | |
Estimates the values of the censored data | |
''' | |
N_tot = self.data.shape[0] | |
N_nd = self.data.mask.sum() | |
plot_pos = np.zeros(N_tot) | |
if N_nd == 0: | |
modeled_data = np.array([]) | |
Z = np.array([]) | |
fit = [] | |
elif N_tot - N_nd < 2 or N_nd/N_tot > 0.8: | |
modeled_data = self.data.data[self.data.mask] * 0.5 | |
Z = np.array([]) | |
fit = [] | |
else: | |
num_DLs = self.DLs.shape[0] | |
PE = np.zeros(num_DLs+1) | |
A = np.zeros(num_DLs) | |
B = np.zeros(num_DLs) | |
C = np.zeros(num_DLs) | |
for j in range(num_DLs-1, -1, -1): | |
#define A and B | |
try: | |
det_lims = (self.DLs[j], self.DLs[j+1]) | |
except IndexError: | |
det_lims = (self.DLs[j], np.inf) | |
A[j], B[j], C[j] = self.__computeABC(det_lims) | |
PE[j] = PE[j+1] + A[j]/(A[j]+B[j]) * (1-PE[j+1]) | |
for n in range(N_tot): | |
j = self.DLIndex[n] | |
if self.data.mask[n]: | |
plot_pos[n] = (1-PE[j]) * self.aranks[n]/(C[j]+1) | |
else: | |
plot_pos[n] = (1-PE[j]) + (PE[j] - PE[j+1]) * self.aranks[n] / (A[j]+1) | |
Zprelim = np.ma.MaskedArray(data=stats.distributions.norm.ppf(plot_pos), mask=self.data.mask) | |
fit = stats.mstats.linregress(Zprelim, np.log(self.data)) | |
slope, intercept = fit[:2] | |
modeled_data = np.exp(slope*Zprelim.data[Zprelim.mask] + intercept) | |
full_data = np.hstack([modeled_data, self.data[self.data.mask==False]]) | |
Zfinal, final_data = stats.probplot(full_data.data, fit=0) | |
return plot_pos, final_data, Zfinal #, fit, (A, B, C, PE) | |
def plot(self, filename): | |
''' | |
makes a simple plot showing the original and modeled data | |
''' | |
fig, ax1 = plt.subplots() | |
ax1.plot(self.Z[self.data.mask==False], self.data[self.data.mask==False], | |
'ko', mfc='Maroon', ms=6, label='original detects', zorder=8) | |
ax1.plot(self.Z[self.data.mask==True], self.data.data[self.data.mask], | |
'ko', ms=6, label='original non-detects', zorder=8, mfc='none') | |
ax1.plot(self.Z, self.final_data, 'ks', ms=4, zorder=10, | |
label='modeled data', mfc='DodgerBlue') | |
ax1.set_xlabel(r'$Z$-score') | |
ax1.set_ylabel('concentration') | |
ax1.set_yscale('log') | |
ax1.legend(loc='upper left', numpoints=1) | |
ax1.xaxis.grid(True, which='major', ls='-', lw=0.5, alpha=0.35) | |
ax1.yaxis.grid(True, which='major', ls='-', lw=0.5, alpha=0.35) | |
ax1.yaxis.grid(True, which='minor', ls='-', lw=0.5, alpha=0.17) | |
plt.tight_layout() | |
fig.savefig(filename) | |
plt.close(fig) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment