Last active
December 31, 2015 04:28
-
-
Save pckujawa/7934113 to your computer and use it in GitHub Desktop.
It works! Hw4 big data with calibration. Put in same folder so the notebook can use the module.
This file has been truncated, but you can view the full file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# <nbformat>3.0</nbformat> | |
# <markdowncell> | |
# ### Algorithm | |
# | |
# 1. Choose some profile x, x ~ (0, F). Do this by creating x from $B * \mu$, where $\mu$ ~ (0,1), so $F = B * B^*$. $x$ **must** have fewer elements than $\mu$, so B needs to be constructed correctly. You can start with a vector $b$ and do 'valid' convolution with $\mu$. | |
# * Create a matrix A. | |
# * Simulate a measurement $y = A x + \nu$, $\nu$ ~ (0, S). y *should/must?* have more elements than $x$. You can start with a vector $a$ and do 'full' convolution with $x$. | |
# | |
# 2. Assume A is known | |
# * Find $\hat{x}$, $V(\hat{x_i}) = Q_{i,i}$ | |
# * (optional) Repeat measurement of x N times. collect meas info \hat{x} and its variance | |
# | |
# 3. A not known | |
# * generate calibration signals \phi_i | |
# * simulate measurement $\psi_i = A \phi_i + \nu_i$ | |
# * collect calibration info ??? | |
# * compute A_0, J, R, \hat{x} | |
# * (optional) arrange K cal., N meas of x | |
# | |
# | |
# <codecell> | |
##%pylab inline | |
##figsize(18, 4) | |
# <codecell> | |
from __future__ import (division) | |
import numpy as np | |
import scipy | |
import scipy.linalg | |
from scipy.linalg import (toeplitz, svd, inv) | |
from pprint import (pprint, pformat) | |
import matplotlib.pyplot as plt | |
# Global parameters | |
verbose = 0#True # print/plot stuff | |
copyValuesOutOfFunctions = 0#True | |
def plotNicely(title, yLabelsMappedToValues): | |
fig, ax = plt.subplots(1) | |
for label,y in yLabelsMappedToValues.iteritems(): | |
# Prefer line with small data point (.) marker style | |
ax.plot(y, '.-', label=label) | |
leg = ax.legend(loc='best', fancybox=True) | |
leg.get_frame().set_alpha(0.5) | |
ax.set_title(title) | |
return fig, ax # so user can further customize | |
def plotEstimate(title, y, x, xhat, Q, *ignoreExtraArgs): | |
""" | |
Q is covariance of xhat | |
""" | |
plotInfo = {r'$\hat{x}$': xhat, 'x': x, 'y': y} | |
fig, ax = plotNicely(title, plotInfo) | |
# Add error coloring | |
# http://matplotlib.org/users/recipes.html#fill-between-and-alpha | |
t = range(len(xhat)) | |
sigma1 = np.sqrt(np.diag(Q)) | |
mu1 = np.ravel(xhat) | |
if verbose: print 'sigma1', sigma1.shape, 'xhat', xhat.shape | |
ax.fill_between(t, mu1+sigma1, mu1-sigma1, facecolor='blue', alpha=0.5, label=r'$V(\hat{x})^2$') | |
ax.legend() # no dice - can't see a label for the noise | |
return fig, ax | |
# <codecell> | |
def makeB(b, lenMu): | |
# b * \mu, 'valid', is equiv to B*\mu when B is a Toeplitz | |
if verbose: print 'b', b | |
bMidpt = int(len(b)/2) | |
if verbose: print 'b_mid', bMidpt | |
firstRowB = [0]*lenMu | |
for ix,b_ix in enumerate(b): | |
firstRowB[ix] = b_ix | |
firstColB = [0] * (lenMu - len(b) + 1) | |
firstColB[0] = b[0] | |
B = np.matrix(toeplitz(firstColB, firstRowB)) | |
if verbose: print 'B.shape', B.shape | |
if verbose: print 'B[0]', B[0] | |
if verbose: print 'B[-1]', B[-1] | |
return B | |
def makeA(a, lenX): | |
# a * x, 'full', is equiv to A*x when A is like a Toeplitz but with more zeros | |
firstRowA = [a[-1]] + [0]*(lenX - 1) | |
firstColA = a + [0]*(lenX - 1) | |
A = np.matrix(toeplitz(firstColA, firstRowA)) | |
if verbose: print 'A', A | |
return A | |
def getKnownATuple(b, a, S, lenMu=100, showPlot=True, mu=None): | |
""" | |
Remarks: a and b are treated as if they start from i=0. | |
S is the covariance of the additive noise of the output (nu). | |
""" | |
if mu is None: | |
mu = np.random.randn(lenMu) | |
else: | |
lenMu = len(mu) | |
B = makeB(b, lenMu) | |
# x needs to have fewer points than mu | |
x = np.convolve(mu, b, mode='valid') | |
xFromBMatrix = np.inner(B, mu) | |
if verbose: print 'x', x | |
if verbose: print r'B * \mu', xFromBMatrix | |
assert np.all(np.abs(x - xFromBMatrix) < 1e-10), "Uh-oh, different answers for b conv mu and B * mu" | |
A = makeA(a, len(x)) | |
# y should have more points than x | |
y = np.convolve(x, a, mode='full') | |
yFromAMatrix = np.inner(A, x) | |
assert np.all(np.abs(y - yFromAMatrix) < 1e-10), "Uh-oh, a conv x not equal to A * x" | |
nu = np.random.randn(len(y)) * S | |
y += nu | |
if verbose: print 'shapes of mu,x,y', mu.shape, x.shape, y.shape | |
assert len(x) == len(mu) - len(b) + 1 | |
assert len(y) == len(x) + len(a) - 1 | |
if showPlot: | |
# http://matplotlib.org/users/recipes.html?highlight=legend#transparent-fancy-legends | |
fig, ax = plt.subplots(1) | |
ax.plot(mu, 'o-', color='gray', label=r'$\mu$') | |
ax.plot(x, '.-', color='black', label='x') | |
ax.plot(y, '.-', color='blue', label='y') | |
leg = ax.legend(loc='best', fancybox=True) | |
leg.get_frame().set_alpha(0.5) | |
ax.set_title(r'$y = A x + \nu, x = B \mu, \mu$ ~ $(0,1)$') | |
# Variance of x, F = B * B^* should be equiv to b conv b^* | |
f = np.convolve(b, list(reversed(b)), mode='full') # zero is now in the center of vector | |
assert len(f) == 2*len(b) - 1 | |
F = np.matrix(np.inner(B, B)) | |
if verbose: print 'f', f | |
if verbose: print 'F', F | |
#toeplitz(list(f[int(len(f)/2):]) + [0]*1) # looks same as F if the zeros were right len | |
if copyValuesOutOfFunctions: globals().update(locals()) ###### | |
return A, F, y, x, nu | |
# <codecell> | |
def estimateWithKnownA(A, S, F, x0, y): | |
""" Assume A known, so there is no uncertainty (J) about it, and | |
our estimate of it (A0) is itself. | |
S is cov of output noise (nu), F is cov of input (x), x0 is expected value of x, y is output. | |
""" | |
assert np.all(S > 0) | |
A0 = A | |
J = 0 | |
A0_star = A0.T | |
F_inv = F.I | |
SplusJ = S + J | |
SplusJ_inv = 1.0 / SplusJ | |
AsSinA = A0_star * (SplusJ_inv * A0) | |
Q_inv = AsSinA + F_inv | |
Q = Q_inv.I | |
F_invx0 = F_inv * np.matrix(x0).T | |
A0_starSplusJ_inv = A0_star * SplusJ_inv | |
def estimator(y): | |
yColumn = np.matrix(y).T | |
A0_starSplusJ_invY = A0_starSplusJ_inv * yColumn | |
R = Q * A0_starSplusJ_inv | |
r = Q * F_invx0 | |
xhat = Q * (F_invx0 + A0_starSplusJ_invY) | |
xhatFromRy = R * yColumn + r | |
if copyValuesOutOfFunctions: globals().update(locals()) ###### | |
return xhat | |
xhat = estimator(y) | |
if copyValuesOutOfFunctions: globals().update(locals()) ###### | |
if verbose: pprint(locals()) | |
return xhat, Q | |
# ## Calibration | |
def calibrate(inputs, inputCovariance, outputs, noiseCovariance): | |
"""Given (k many) inputs, outputs, and their distributions, | |
get information to construct an estimate as if A was unknown. | |
inputs are list or matrix of phis/x's (k of them), inputCovariance is F, outputs are k psis/y's, noiseCovariance is S | |
:returns: (R, r, Q) such that \hat{x} = R*y + r and Q_{i,i} is its variance | |
""" | |
x0 = np.matrix([0]*inputs.shape[0]).T | |
x0_star = x0.T | |
# Phi and psi should be column vectors stacked left to right | |
phi = inputs | |
psi = outputs | |
F = np.matrix(inputCovariance) | |
S = np.matrix(noiseCovariance) | |
phi_star = phi.T | |
G = psi * phi_star | |
H = phi * phi_star | |
assert G.shape[1] == H.shape[0] | |
assert H.shape[0] == H.shape[1] | |
#assert np.linalg.det(H) - 1e-5 > 0, pformat(locals()) | |
H_inv = H.I | |
A0 = G * H_inv | |
A0_star = A0.T | |
x0x0_star = x0 * x0_star | |
F_bar = F + x0x0_star | |
H_invF_bar = H_inv * F_bar | |
alpha = np.trace(H_invF_bar) | |
J = alpha * S | |
S_inv = np.ravel(S.I)[0] # KLUDGE! but how else if S is 1x1 | |
F_inv = F.I | |
AsSinA = A0_star * (S_inv * A0) | |
alpha_mult = (1.0/(alpha + 1)) | |
Q_inv = alpha_mult * AsSinA + F_inv | |
Q = Q_inv.I | |
r = Q * (F_inv * x0) | |
R = alpha_mult * Q * (A0_star * S_inv) | |
if verbose: pprint(locals()) | |
if copyValuesOutOfFunctions: globals().update(locals()) | |
return R, r, Q | |
def getCalibrated(b, a, S, k=10, showPlots=False): | |
phis = [] | |
psis = [] | |
Fs = [] | |
for ignore in xrange(k): | |
A, F, psi_i, phi_i, nu = getKnownATuple(b, a, S, showPlot=showPlots) | |
#print 'y', y.shape, 'x', x.shape | |
# F is the same every time. the code below shows that | |
#try: | |
# prev_F = Fs[-1] | |
# print 'F was the same as prev' if np.all(np.abs(F - prev_F) < 1e-10) else '' | |
#xcept: pass | |
Fs.append(F) | |
phis.append(phi_i) | |
psis.append(psi_i) | |
# Default matrix ctor stacks the wrong way, so transpose | |
Phi = np.matrix(phis).T | |
Psi = np.matrix(psis).T | |
# What should we do to accumulate F and S? I guess they shouldn't change. (They don't seem to.) | |
R, r, Q = calibrate(Phi, F, Psi, S) | |
if copyValuesOutOfFunctions: globals().update(locals()) | |
if verbose: # debugging stuff | |
# "What about F? Is it invertible? For large K H/K = (Phi Phi*)/K should become close to F." | |
print 'k =', k | |
print 'F', F | |
print 'H/k', H/k | |
closeCells = np.abs(F - H/k) < 0.01 | |
print np.sum(closeCells), 'very close values out of', closeCells.size | |
print [mtx.shape for mtx in (Q, A0, y, x, AsSinA, F, G, H, psi, phi)] | |
print 'phi', phi.shape, 'psi', psi.shape | |
print 'F', F.shape, 'S', S.shape | |
print 'G', G.shape, np.ravel(G)[:10] | |
print 'H', G.shape, np.ravel(H)[:10] | |
print 'det(H)', np.linalg.det(H) | |
print 'R', R.shape, R | |
return R, r, Q | |
def plotWithKnownA(a, b, S): | |
A, F, y, x, nu = getKnownATuple(b, a, S) | |
x0 = [0]*len(x) # assume E(x) = 0 | |
xhat, Q = estimateWithKnownA(A, S, F, x0, y) | |
fig, ax = plotEstimate(r'$\hat{x}$ with known A, $x$, and $y$', y, x, xhat, Q) | |
# Shouldn't we have a near-perfect estimate of x if a=impulse and we subtract the noise (mu)? | |
# No! Because our estimate relies on S and we can't pass in S=0. | |
# But we can pass in a really small S :) | |
return fig, ax | |
if __name__ == '__main__': | |
# Known A | |
lenB = 9 | |
b = [1.0/lenB]*lenB | |
lenA = 3 | |
a = [1.0/lenA]*lenA | |
S = 0.1 # S | |
print 'a', a, 'b', b, 'S', S | |
A, F, y, x, nu = getKnownATuple(b, a, S, showPlot=True) | |
x0 = [0]*len(x) # assume E(x) = 0 | |
xhat, Q = estimateWithKnownA(A, S, F, x0, y) | |
fig, ax = plotEstimate(r'$\hat{x}$ with known A, $x$, and $y$', y, x, xhat, Q) | |
# Unknown A | |
# ### Running calibration measurements | |
k = 100 | |
showPlots = False # all k of them | |
usePreviousPlotValues = True # from known A | |
# Use a, b, and S from above or else x and y won't be same | |
if not usePreviousPlotValues: | |
lenB = 9 | |
b = [1.0/lenB]*lenB | |
lenA = 21 | |
a = [1.0/lenA]*lenA | |
S = 0.5 # S | |
print 'a', a, 'b', b, 'S', S | |
# Generate x and y to compare against xhat | |
A, F, y, x, nu = getKnownATuple(b, a, S, showPlot=True) | |
# Ok, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment