-
-
Save mrgloom/6619941 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from matplotlib import pyplot as plt | |
from scipy.optimize import fmin_l_bfgs_b as bfgs | |
from scipy.io import loadmat | |
class params: | |
''' | |
A wrapper around weights and biases | |
for an autoencoder | |
''' | |
def __init__(self,indim,hdim,w1 = None,b1 = None,w2 = None,b2 = None): | |
self._indim = indim | |
self._hdim = hdim | |
# initialize the weights randomly and the biases to 0 | |
r = np.sqrt(6.) / np.sqrt((indim * 2) + 1.) | |
self.w1 = w1 if w1 is not None else np.random.random_sample((indim,hdim)) * 2 * r - r | |
self.b1 = b1 if b1 is not None else np.zeros(hdim) | |
self.w2 = w2 if w2 is not None else np.random.random_sample((hdim,indim)) * 2 * r - r | |
self.b2 = b2 if b2 is not None else np.zeros(indim) | |
def unroll(self): | |
''' | |
"Unroll" all parameters into a single parameter vector | |
''' | |
w1 = self.w1.reshape(self._indim * self._hdim) | |
w2 = self.w2.reshape(self._hdim * self._indim) | |
return np.concatenate([w1, | |
w2, | |
self.b1, | |
self.b2]) | |
@staticmethod | |
def roll(theta,indim,hdim): | |
''' | |
"Roll" all parameter vectors into a params instance | |
''' | |
wl = indim * hdim | |
w1 = theta[:wl] | |
w1 = w1.reshape((indim,hdim)) | |
bb = wl + wl | |
w2 = theta[wl: bb] | |
w2 = w2.reshape((hdim,indim)) | |
b1 = theta[bb : bb + hdim] | |
b2 = theta[bb + hdim : bb + hdim + indim] | |
return params(indim,hdim,w1,b1,w2,b2) | |
def __eq__(self,o): | |
return (self.w1 == o.w1).all() and \ | |
(self.w2 == o.w2).all() and \ | |
(self.b1 == o.b1).all() and \ | |
(self.b2 == o.b2).all() | |
@staticmethod | |
def _test(): | |
p = params(100,23) | |
a = p.unroll() | |
b = params.roll(a,100,23) | |
print p == b | |
print all(a == b.unroll()) | |
class autoencoder(object): | |
''' | |
A horribly inefficient implementation | |
of an autoencoder, which is meant to | |
teach me, and not much else. | |
''' | |
def __init__(self, params): | |
# this is multiplied against the sparsity delta | |
self._beta = 3 | |
self._sparsity = 0.01 | |
# this is the learning rate | |
self._alpha = .5 | |
# this is the weight decay term | |
self._lambda = 0.0001 | |
self._netp = params | |
## ACTIVATION OF THE NETWORK ######################################### | |
def _pre_sigmoid(self,a,w): | |
''' | |
Compute the pre-sigmoid activation | |
for a layer | |
''' | |
z = np.zeros((a.shape[0],w.shape[1])) | |
for i,te in enumerate(a): | |
z[i] = (w.T * te).sum(1) | |
return z | |
def _sigmoid(self,la): | |
''' | |
compute the sigmoid function for an array | |
of arbitrary shape and size | |
''' | |
return 1. / (1 + np.exp(-la)) | |
def activate(self,inp): | |
''' | |
Activate the net on a batch of training examples. | |
return the input, and states of all layers for all | |
examples. | |
''' | |
# number of training examples | |
m = inp.shape[0] | |
# pre-sigmoid activation at the hidden layer | |
z2 = self._pre_sigmoid(inp,self._netp.w1) + np.tile(self._netp.b1,(m,1)) | |
# sigmoid activation of the hidden layer | |
a = self._sigmoid(z2) | |
# pre-sigmoid activation of the output layer | |
z3 = self._pre_sigmoid(a,self._netp.w2) + np.tile(self._netp.b2,(m,1)) | |
# sigmoid activation of the hidden layer | |
o = self._sigmoid(z3) | |
return inp,a,z2,z3,o | |
## PARAMETER LEARNING ################################################ | |
def _rcost(self,inp,params,a=None,z2=None,z3=None,o=None): | |
''' | |
The real-valued cost of using the parameters | |
on a (batch) input | |
''' | |
if None == o: | |
# activate the network with params | |
ae = autoencoder(params) | |
inp,a,z2,z3,o = ae.activate(inp) | |
diff = o - inp | |
n = np.zeros(diff.shape[0]) | |
for i,d in enumerate(diff): | |
n[i] = np.linalg.norm(d) | |
# one half squared error between input and output | |
se = 0.5 * (n ** 2) | |
# density? penalty | |
sp = self._sparse_cost(a) | |
# sum of squared error + weight decay + sparse | |
return ((1. / inp.shape[0]) * se.sum())# + self._weight_decay_cost()# + sp | |
def _fprime(self,a): | |
''' | |
first-derivative of the sigmoid function | |
''' | |
return a * (1. - a) | |
def _sparse_ae_cost(self,inp,parms,check_grad = False): | |
''' | |
Get the batch error and gradients for many | |
training examples at once and update | |
weights and biases accordingly | |
''' | |
onem = (1. / len(inp)) | |
inp,a,z2,z3,o = self.activate(inp) | |
d3 = -(inp - o) * self._fprime(o) | |
d2 = (self._backprop(d3)) * self._fprime(a) | |
wg2 = self._weight_grad(d3,a,self._netp.w2) | |
wg1 = self._weight_grad(d2,inp,self._netp.w1) | |
bg2 = self._bias_grad(d3) | |
bg1 = self._bias_grad(d2) | |
c = self._rcost(inp,self._netp,a=a,z2=z2,z3=z3,o=o) | |
print 'ERROR : %1.4f' % c | |
if check_grad: | |
# unroll the gradients into a flat vector | |
rg = params(self.indim,self.hdim,wg1,bg1,wg2,bg2).unroll() | |
# perform a (very costly) numerical | |
# check of the gradients | |
self._check_grad(self._netp,inp,rg) | |
return c, wg2, wg1, bg2, bg1 | |
def _sparse_ae_cost_unrolled(self,inp,parms): | |
c, wg2, wg1, bg2, bg1 = self._sparse_ae_cost(inp,parms) | |
return params(self.indim,self.hdim,wg1,bg1,wg2,bg2).unroll() | |
def _weight_decay_cost(self): | |
ws = self._netp.w1.sum() + self._netp.w2.sum() | |
return (self._lambda / 2.) * ws | |
def _sparse_cost(self,a): | |
''' | |
compute the sparsity penalty for a batch | |
of activations. | |
''' | |
p = self._sparsity | |
p_hat = np.average(a,0) | |
return self._beta * ((p * np.log(p /p_hat)) + ((1 - p) * np.log((1 - p) / (1 - p_hat)))).sum() | |
def _sparse_grad(self,a): | |
p_hat = np.average(a,0) | |
p = self._sparsity | |
return self._beta * (-(p / p_hat) + ((1 - p) / (1 - p_hat))) | |
def _bias_grad(self,cost): | |
return (1. / cost.shape[0]) * cost.sum(0) | |
def _weight_grad(self,cost,a,w): | |
# compute the outer product of the error and the activations | |
# for each training example, and sum them together to | |
# obtain the update for each weight | |
wg = (cost[:,:,np.newaxis] * a[:,np.newaxis,:]).sum(0).T | |
# TODO: Add weight decay and sparsity terms | |
wg = ((1. / cost.shape[0]) * wg)# + (self._lambda * w) | |
return wg | |
def _backprop(self,out_err): | |
''' | |
Compute the error of the hidden layer | |
by performing backpropagation. | |
out_err is the error of the output | |
layer for every training example. | |
rows are errors. | |
''' | |
# the error for the hidden layer | |
h_err = np.zeros((out_err.shape[0],self.hdim)) | |
for i,e in enumerate(out_err): | |
h_err[i] = (e * self._netp.w2).sum(1) | |
return h_err | |
def _update(self,inp,wg2,wg1,bg2,bg1): | |
self._netp.w2 -= self._alpha * wg2 | |
self._netp.w1 -= self._alpha * wg1 | |
self._netp.b2 -= self._alpha * bg2 | |
self._netp.b1 -= self._alpha * bg1 | |
def _check_grad(self,params,inp,grad,epsilon = 10**-4): | |
rc = self._rcost | |
e = epsilon | |
tolerance = e ** 2 | |
theta = params.unroll() | |
num_grad = np.zeros(theta.shape) | |
# compute the numerical gradient of the function by | |
# varying the parameters one by one. | |
for i in range(len(theta)): | |
plus = np.copy(theta) | |
minus = np.copy(theta) | |
plus[i] += e | |
minus[i] -= e | |
pp = params.roll(plus,self.indim,self.hdim) | |
mp = params.roll(minus,self.indim,self.hdim) | |
num_grad[i] = (rc(inp,pp) - rc(inp,mp)) / (2. * e) | |
# the analytical gradient | |
agp = params.roll(grad,self.indim,self.hdim) | |
# the numerical gradient | |
ngp = params.roll(num_grad,self.indim,self.hdim) | |
diff = np.linalg.norm(num_grad - grad) / np.linalg.norm(num_grad + grad) | |
# layer 1 weights difference | |
#print np.linalg.norm(ngp.w1 - agp.w1) / np.linalg.norm(ngp.w1 + agp.w1) | |
# layer 2 weights difference | |
#print np.linalg.norm(ngp.w2 - agp.w2) / np.linalg.norm(ngp.w2 + agp.w2) | |
# layer 1 bias difference | |
#print np.linalg.norm(ngp.b1 - agp.b1) / np.linalg.norm(ngp.b1 + agp.b1) | |
# layer 2 bias difference | |
#print np.linalg.norm(ngp.b2 - agp.b2) / np.linalg.norm(ngp.b2 + agp.b2) | |
print 'Difference between analytical and numerical gradients is %s' % str(diff) | |
#print diff < tolerance | |
print '================================================================' | |
plt.gray() | |
plt.figure() | |
# plot the analytical gradients on | |
# the first row | |
plt.subplot(2,4,1) | |
plt.imshow(agp.w1) | |
plt.subplot(2,4,2) | |
plt.imshow(agp.w2) | |
plt.subplot(2,4,3) | |
plt.plot(agp.b1) | |
plt.subplot(2,4,4) | |
plt.plot(agp.b2) | |
# plot the numerical (brute force) | |
# gradients on the second row | |
plt.subplot(2,4,5) | |
plt.imshow(ngp.w1) | |
plt.subplot(2,4,6) | |
plt.imshow(ngp.w2) | |
plt.subplot(2,4,7) | |
plt.plot(ngp.b1) | |
plt.subplot(2,4,8) | |
plt.plot(ngp.b2) | |
plt.show() | |
return ngp.unroll() | |
@property | |
def indim(self): | |
return self._netp._indim | |
@property | |
def hdim(self): | |
return self._netp._hdim | |
def train(self,inp,iterations,with_bfgs = False,grad_check_freq = None): | |
def rcst(x): | |
return self._rcost(inp,params.roll(x,self.indim,self.hdim)) | |
def rcstprime(x): | |
return self._sparse_ae_cost_unrolled(inp,params.roll(x,self.indim,self.hdim)) | |
def prnt(p): | |
print p.sum() | |
if with_bfgs: | |
mn,val,info = bfgs(rcst, | |
self._netp.unroll(), | |
rcstprime, | |
disp = True) | |
# update the parameters | |
self._netp = params.roll(mn,self.indim,self.hdim) | |
else: | |
for i in range(iterations): | |
if grad_check_freq: | |
gc = i and not i % grad_check_freq | |
else: | |
gc = False | |
grads = self._sparse_ae_cost(inp,self._netp, check_grad = gc) | |
# leave out cost, the first item | |
# in the grads tuple | |
self._update(inp,*grads[1:]) | |
import argparse | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Attempted autoencoder implementation') | |
parser.add_argument('--imgpath', | |
help='path to IMAGES.mat') | |
parser.add_argument('--usebfgs', | |
help='use scipy.optimize.fmin_l_bfgs_b to minimize cost', | |
action='store_true', | |
default=False) | |
parser.add_argument('--nsamples', | |
help='number of patches to draw from images', | |
type=int, | |
default=10) | |
args = parser.parse_args() | |
# size of input and output layers | |
n_dim = 64 | |
# size of hidden layer | |
h_dim = 25 | |
def random_image_patches(n_samples=args.nsamples): | |
a = loadmat(args.imgpath) | |
m = a['IMAGES'] | |
patches = np.zeros((n_samples,64)) | |
ri = np.random.randint | |
for i in range(n_samples): | |
img = ri(0,10) | |
r = ri(0,504) | |
c = ri(0,504) | |
patches[i] = m[r:r+8,c:c+8,ri(0,10)].reshape(64) | |
return patches | |
# preprocess the data | |
def preprocess(data): | |
data -= data.mean(axis=0) | |
pstd = 3 * data.std() | |
data = np.maximum(np.minimum(data,pstd),-pstd) / pstd | |
data = (data + 1) * .4 + .1 | |
return data | |
samples = random_image_patches() | |
samples = preprocess(samples) | |
# if we're not using bfgs, do a numerical gradient check every | |
# 10 training epochs | |
gcf = 10 if not args.usebfgs else None | |
# train the autoencoder | |
ae = autoencoder(params(n_dim,h_dim)) | |
ae.train(samples, # image patches | |
1000, # n epochs | |
with_bfgs = args.usebfgs, # use scipy.optimize.l_fmin_bfgs_b | |
grad_check_freq = gcf) # perform a numerical gradient check every n iterations | |
# grab some new patches and reconstruct them | |
num_plots = 10 | |
samples = random_image_patches(num_plots) | |
samples = preprocess(samples) | |
plt.figure() | |
for i in range(num_plots): | |
signal = samples[i] | |
inp,a,z2,z3,o = ae.activate(np.array([signal])) | |
plt.subplot(num_plots,1,i + 1) | |
plt.plot(signal,'b-') | |
plt.plot(o[0],'g-') | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment