Created
October 16, 2015 15:31
-
-
Save CarlosCancino-Chacon/b7427ab63dfc0510cd7b to your computer and use it in GitHub Desktop.
Conjugate gradient optimization for Lasagne
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import OrderedDict | |
import numpy as np | |
import theano | |
import theano.tensor as T | |
from theano.ifelse import ifelse | |
def cg(loss, params, x0=None, max_iters=100, precondition=None, tol=1e-3, | |
conv_crit='cg'): | |
"""(Preconditioned) Conjugate Gradient (CG) updates | |
Generates update expressions of the form: | |
* ``param := optimized_param`` | |
Parameters | |
---------- | |
loss : symbolic expression | |
A scalar loss expression | |
params : list of shared variables | |
The variables to generate update expressions for | |
x0 : shared variable or list of shared variables. | |
Initialization values for CG. If None, the list is initialized | |
with zeros of the same size as the parameters | |
max_iters : int | |
Maximum number of iterations for CG. Default is 100. | |
precondition : array or list of arrays | |
Preconditioner for CG. Default is None. | |
tol : float | |
Tolerance for CG. Default is 1e-3 | |
conv_crit : 'cg' or 'hf' | |
Convergence criterion for stopping CG iterations. If 'cg', the | |
standard convergence criterion for CG is used. If 'hf', the modified | |
convergence criterion by Martens is used | |
Returns | |
------- | |
OrderedDict | |
A dictionary mapping each parameter to its update expression | |
""" | |
if not isinstance(params, (list, tuple)): | |
params = [params] | |
if precondition is None: | |
precondition = 1. | |
if not isinstance(precondition, (list, tuple)): | |
precondition = [precondition] * len(params) | |
if x0 is None: | |
x0 = [theano.shared(np.zeros_like(p.get_value())) for p in params] | |
if not isinstance(x0, (list, tuple)): | |
x0 = [x0] | |
n_params = len(params) | |
def loop(i, phi_im1, *args): | |
vi = args[:n_params] | |
xi = args[n_params:2 * n_params] | |
ri = args[2 * n_params:3 * n_params] | |
alpha = args[3 * n_params] | |
ui = [] | |
for param in params: | |
uip = T.Rop(T.grad(loss, param), params, vi) | |
if isinstance(uip, (list, tuple)): | |
ui.append(uip[0]) | |
else: | |
ui.append(uip) | |
t = alpha / sum((vj * uj).sum() for vj, uj in zip(vi, ui)) | |
xi = [x + t * v for x, v in zip(xi, vi)] | |
ri = [r - t * u for r, u in zip(ri, ui)] | |
wi = [pc * r for r, pc in zip(ri, precondition)] | |
beta = sum((w * w).sum() for w in wi) | |
s = beta / alpha | |
vi = [pc * w + s * v for w, v, pc in zip(wi, vi, precondition)] | |
alpha = beta | |
# Convergence criteria | |
if conv_crit == 'hf': | |
Ax = [] | |
for param in params: | |
ax = T.Rop(T.grad(loss, param), params, xi) | |
if isinstance(ax, (list, tuple)): | |
Ax.append(ax[0]) | |
else: | |
Ax.append(ax) | |
b = [r + a for r, a in zip(ri, Ax)] | |
phi_i = sum(0.5 * (x * ax).sum() - (x * bb).sum() | |
for x, ax, bb in zip(xi, ui, b)) | |
k = T.cast(T.floor(T.max([10, .1 * i])), 'int32') | |
phi_crit = (phi_i - phi_im1) / phi_i | |
cc1 = T.lt(k, i) | |
cc2 = T.lt(phi_i, 0) | |
cc3 = T.lt(phi_crit, k * tol) | |
phi_im1 = ifelse(T.lt(i, 10), phi_im1, phi_i) | |
convergence_crit = T.and_(cc1, T.and_(cc2, cc3)) | |
elif conv_crit == 'cg': | |
cc1 = T.lt(beta, tol) | |
cc2 = T.lt(sum((r * r).sum() for r in ri), tol) | |
convergence_crit = T.and_(cc1, cc2) | |
return ([phi_im1] + vi + xi + ri + [alpha], | |
theano.scan_module.until(convergence_crit)) | |
# Initialize variables | |
r0 = theano.clone(theano.grad(loss, params), replace=dict(zip(params, x0))) | |
if not isinstance(r0, (list, tuple)): | |
r0 = [r0] | |
r0 = [- r for r in r0] | |
w0 = [pc * r for r, pc in zip(r0, precondition)] | |
v0 = [pc * w for w, pc in zip(w0, precondition)] | |
alpha = sum((w * w).sum() for w in w0) | |
phi_im1 = np.array(0, dtype=theano.config.floatX) | |
outs, updates = theano.scan( | |
loop, | |
outputs_info=([phi_im1] + v0 + x0 + r0 + [alpha]), | |
sequences=[T.arange(max_iters)] | |
) | |
x_sol = outs[1 + n_params: 2 * n_params + 1] | |
updates = OrderedDict(updates) | |
for param, sol in zip(params, x_sol): | |
updates[param] = sol[-1] | |
return updates | |
def test_cg_Burden(): | |
print "Example from Burden and Faires' Numerical Analysis, 7th Ed." | |
# Example from Chapter 7, pp. 477 (Example 3) | |
# Solve Ax = b | |
A = np.array([ | |
[0.2, 0.1, 1, 1, 0], | |
[0.1, 4, -1, 1, -1], | |
[1, -1, 60, 0, -2], | |
[1, 1, 0, 8, 4], | |
[0, -1, -2, 4, 700]]).astype(theano.config.floatX) | |
b = np.array([1, 2, 3, 4, 5]).astype(theano.config.floatX) | |
x = theano.shared(np.zeros_like(b).astype(theano.config.floatX), name='x') | |
real_sol = np.array([7.859713071, | |
0.4229264082, | |
-0.07359223906, | |
-0.5406430164, | |
0.01062616286]).astype(theano.config.floatX) | |
precondition = 1 / np.sqrt(A.diagonal()) | |
# Loss function for conjugate gradient | |
loss = (0.5 * T.dot(x.T, T.dot(A, x)) - T.dot(x, b)) | |
x_sol_hf = cg(loss, x, precondition=precondition, conv_crit='hf') | |
x_sol_cg = cg(loss, x, precondition=precondition, conv_crit='cg') | |
x_sol_hf_fun = theano.function([], x_sol_hf[x]) | |
x_sol_cg_fun = theano.function([], x_sol_cg[x]) | |
solution_hf = x_sol_hf_fun() | |
solution_cg = x_sol_cg_fun() | |
print 'Real solution:' | |
print real_sol | |
print 'CG solution (convergence criterion:hf):' | |
print solution_hf | |
print'MSE' | |
print ((solution_hf - real_sol)**2).sum() | |
print 'CG solution (convergence criterion:cg):' | |
print solution_cg | |
print 'MSE' | |
print ((solution_cg - real_sol)**2).sum() | |
def test_lasagne(n_features, n_hidden, n_samples_train, n_samples_test): | |
import lasagne | |
print 'Toy Neural Network example' | |
l_in = lasagne.layers.InputLayer((None, n_features)) | |
l_out = lasagne.layers.DenseLayer( | |
l_in, n_features, nonlinearity=None) | |
y = lasagne.layers.get_output(l_out) | |
params = lasagne.layers.get_all_params(l_out) | |
t = T.matrix() | |
loss = lasagne.objectives.squared_error(y, t).mean() | |
updates = cg(loss, params, conv_crit='hf') | |
loss_fun = theano.function([l_in.input_var, t], loss, updates=updates) | |
test_fun = theano.function([l_in.input_var, t], loss) | |
predict = theano.function([l_in.input_var], y) | |
X_train = np.random.rand( | |
n_samples_train, n_features).astype(theano.config.floatX) | |
y_train = 2. * X_train | |
X_test = np.random.rand( | |
n_samples_test, n_features).astype(theano.config.floatX) | |
y_test = 2. * X_test | |
predictions_before = predict(X_test) | |
test_loss_before = test_fun(X_test, y_test) | |
print 'Test loss before training:{0}'.format(test_loss_before) | |
print 'Output/Input ratio (expected is 2):{0}'.format( | |
predictions_before.sum() / X_test.sum()) | |
# Training | |
loss_fun(X_train, y_train) | |
test_loss_after = test_fun(X_test, y_test) | |
predictions_after = predict(X_test) | |
print 'Test loss after training:{0}'.format(test_loss_after) | |
print 'Output/Input ratio (expected is 2):{0}'.format( | |
predictions_after.sum() / X_test.sum()) | |
if __name__ == '__main__': | |
test_cg_Burden() | |
test_lasagne(n_features=100, | |
n_hidden=50, | |
n_samples_train=1000, | |
n_samples_test=100) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment