Created
April 10, 2020 09:11
-
-
Save cottrell/50ae4b60bd5d7fe59aa34ccada03192e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
import numpy as np | |
import pandas as pd | |
import tensorflow as tf | |
import tensorflow.keras as keras | |
from pylab import * | |
ion() | |
tf.keras.backend.set_floatx("float64") | |
def set_seed(seed=1): | |
import random | |
random.seed(seed) | |
import numpy as np | |
np.random.seed(seed) | |
import tensorflow as tf | |
if hasattr(tf, "reset_default_graph"): | |
tf.reset_default_graph() | |
if hasattr(tf.random, "set_random_seed"): | |
tf.random.set_random_seed(seed) | |
else: | |
tf.random.set_seed(seed) | |
def get_data(seed=1): | |
set_seed(seed) | |
m = 250 # samples | |
n_x = 1 # dim of x | |
n_tau = 11 | |
x = (2 * np.random.rand(m, n_x).astype(np.float64) - 1) * 2 | |
i = np.argsort(x[:, 0]) | |
x = x[i] # to make plotting nicer | |
A = np.random.randn(n_x, 1) | |
y = x ** 2 + 0.3 * x + 0.4 * np.random.randn(m, 1).astype(np.float64) | |
y = y.dot(A) # y is 1d | |
# y = y.squeeze() | |
tau = np.linspace(1.0 / n_tau, 1 - 1.0 / n_tau, n_tau).astype(np.float64) | |
tau = tau[:, None] | |
return locals() | |
def make_layers(*, dims, activation="tanh", kernel_constraint="nonneg", kernel_initializer="uniform"): | |
if kernel_initializer == "uniform": | |
kernel_initializer = keras.initializers.RandomUniform(minval=0, maxval=1) | |
if kernel_constraint == "nonneg": | |
kernel_constraint = keras.constraints.NonNeg() | |
layers = list() | |
for i, dim in enumerate(dims): | |
if i == len(dims) - 1: | |
activation = None | |
layers.append(tf.keras.layers.Dense(dim, kernel_initializer=kernel_initializer, kernel_constraint=kernel_constraint, activation=activation, dtype=tf.float64,)) | |
return layers | |
def reduce_layers(input, layers): | |
return functools.reduce(lambda x, y: y(x), [input] + layers) | |
def logit(x): | |
check = tf.reduce_min(x) | |
tf.debugging.assert_greater(check, tf.cast(0.0, tf.float64), message=f"logit got {check} < 0") | |
tf.debugging.assert_less(check, tf.cast(1.0, tf.float64), message=f"logit got {check} > 1") | |
return tf.math.log(x) - tf.math.log(1 - x) | |
def rho_quantile_loss(tau_y, u): | |
tau, y = tau_y | |
tf.debugging.assert_rank(y, 2, f"y should be rank 2") | |
u = y[:, None, :] - u[None, :, :] | |
# tf.debugging.assert_rank(y, 3, f'y should be rank 3') | |
tf.debugging.assert_rank(tau, 2, f"tau should be rank 2") | |
tau = tau[None, :, :] | |
res = u * (tau - tf.where(u <= np.float64(0.0), np.float64(1.0), np.float64(0.0))) | |
return tf.reduce_sum(tf.reduce_mean(res, axis=[1, 2]), axis=0) | |
def rho_expectile_loss(tau_y, u): | |
tau, y = tau_y | |
tf.debugging.assert_rank(y, 2, f"y should be rank 2") | |
u = y[:, None, :] - u[None, :, :] | |
# tf.debugging.assert_rank(y, 3, f'y should be rank 3') | |
tf.debugging.assert_rank(tau, 2, f"tau should be rank 2") | |
tau = tau[None, :, :] | |
res = u ** 2 * (tau - tf.where(u <= 0.0, 1.0, 0.0)) | |
return tf.reduce_sum(tf.reduce_mean(res, axis=[1, 2]), axis=0) | |
class QuantileNetworkNoX(tf.keras.models.Model): | |
"""Deep quantile regression. Recall that quantile is defined as the arg min of | |
q(tau) = argmin_u E(rho(tau, Y - u) | |
where rho(tau, y) = y * (tau - (y < 0))""" | |
def __init__(self, *, dims): | |
super().__init__() | |
self._my_layers = make_layers(dims=dims, activation="tanh") | |
def quantile(self, tau): | |
tf.debugging.assert_rank(tau, 2, message=f"tau should be rank two for now") | |
u = logit(tau) # map from (0, 1) to (-infty, infty) | |
return reduce_layers(u, self._my_layers) | |
def call(self, inputs): | |
"""Use this signature to support keras compile method""" | |
tau, y = inputs | |
return self.quantile(tau) | |
def sanity_plot_nox(steps=1000): | |
l = get_data() | |
tau = l["tau"] | |
y = l["y"] | |
model = QuantileNetworkNoX(dims=[16, 16, 1]) | |
opt = tf.keras.optimizers.Adam(learning_rate=0.01) | |
@tf.function | |
def one_step(): | |
with tf.GradientTape() as tape: | |
loss = rho_quantile_loss((tau, y), model((tau, y))) | |
g = tape.gradient(loss, model.trainable_variables) | |
opt.apply_gradients(zip(g, model.trainable_variables)) | |
return loss | |
# model.compile(loss=rho_quantile_loss, optimizer=opt) | |
fig = figure(1) | |
fig.clf() | |
ax = fig.subplots(1, 1) | |
n = len(y) | |
p = np.linspace(1.0 / n, 1 - 1.0 / n, n) | |
i = y[:, 0].argsort() | |
ax.plot(p, y[i, 0], "r-", label="data") | |
ax.legend() | |
loss = list() | |
for i in range(steps): | |
loss.append(one_step()) | |
q = model.quantile(tau).numpy().squeeze() | |
ax.plot(tau, q, "b.-", alpha=0.5) | |
ax.set_xlabel("tau") | |
fig.tight_layout() | |
fig.show() | |
return locals() | |
class QuantileNetwork(tf.keras.models.Model): | |
"""Deep quantile regression. Recall that quantile is defined as the arg min of | |
q(tau) = argmin_u E(rho(tau, Y - u) | |
where rho(tau, y) = y * (tau - (y < 0))""" | |
def __init__(self, *, tau_dims, x_dims, final_dims): | |
super().__init__() | |
self._my_tau_layers = make_layers(dims=tau_dims, activation="tanh") | |
self._my_x_layers = make_layers(dims=tau_dims, activation="tanh", kernel_constraint=None, kernel_initializer="glorot_uniform") | |
self._my_x_layers.append(lambda x: tf.square(x)) | |
self._final_layers = make_layers(dims=final_dims, activation="linear") | |
def quantile(self, tau, x): | |
tf.debugging.assert_rank(tau, 2, message=f"tau should be rank two for now") | |
u = logit(tau) # map from (0, 1) to (-infty, infty) | |
u = reduce_layers(u, self._my_tau_layers) | |
v = reduce_layers(x, self._my_x_layers) | |
q = v[:, None, :] * u[None, :, :] | |
# this is a sum of monotonic functions with positive coef | |
q = reduce_layers(q, self._final_layers) | |
return q | |
def call(self, inputs): | |
"""Use this signature to support keras compile method""" | |
tau, y, x = inputs | |
return self.quantile(tau, x) | |
def sanity_plot(steps=1000): | |
l = get_data() | |
tau = l["tau"] | |
y = l["y"] | |
x = l["x"] | |
model = QuantileNetwork(tau_dims=[64, 64], x_dims=[64, 64], final_dims=[1]) | |
opt = tf.keras.optimizers.Adam(learning_rate=0.01) | |
@tf.function | |
def one_step(): | |
with tf.GradientTape() as tape: | |
loss = rho_quantile_loss((tau, y), model((tau, y, x))) | |
g = tape.gradient(loss, model.trainable_variables) | |
opt.apply_gradients(zip(g, model.trainable_variables)) | |
return loss | |
# model.compile(loss=rho_quantile_loss, optimizer=opt) | |
fig = figure(1) | |
fig.clf() | |
ax = fig.subplots(1, 1) | |
ax = [ax] | |
ax[0].plot(x[:, 0], y.squeeze(), ".") | |
ax[0].set_ylabel("y") | |
loss = list() | |
for i in range(steps): | |
loss.append(one_step()) | |
q = model.quantile(tau, x).numpy().squeeze() | |
ax[0].plot(x[:, 0], q, alpha=0.5) | |
ax[0].set_xlabel(f"x[:,0] (x.shape={x.shape})") | |
# ax[1].plot(tau.squeeze(), q[:10,:].T) | |
# ax[1].set_xlabel('tau') | |
# ax[1].set_ylim(ax[0].get_ylim()) | |
fig.tight_layout() | |
fig.show() | |
return locals() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment