Skip to content

Instantly share code, notes, and snippets.

@ypeleg
Created December 7, 2020 16:42
Show Gist options
  • Save ypeleg/576c9c6470e7013ae4b4b7d16736947f to your computer and use it in GitHub Desktop.
Save ypeleg/576c9c6470e7013ae4b4b7d16736947f to your computer and use it in GitHub Desktop.
Keras GrowNet gradient boost
import tensorflow as tf
from copy import deepcopy
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model, clone_model
from tensorflow.keras.layers import Input, Dropout, Dense, ReLU, BatchNormalization, Activation, Concatenate
class DynamicNet(object):
def __init__(self, c0 = None, lr = None, concat_input=False, additive_boosting=False, encoder_layers=None):
self.models = []
self.c0 = tf.Variable(np.float32(c0) if c0 is not None else random.uniform(0.0, 1.0))
self.lr = lr
self.boost_rate = tf.Variable(lr if lr is not None else random.uniform(0.0, 1.0))
self.concat_input = None
self.additive_boosting = False
self.encoder_layers = encoder_layers
def freeze_all_networks(self):
for model in self.models:
for l in model.layers: l.trainable = False
def unfreeze_all_networks(self):
for model in self.models:
for l in model.layers: l.trainable = True
def add(self, model):
last_activation = model.layers[-1].activation.__name__
if last_activation in ['sigmoid', 'softmax']: model.layers[-1].activation = None
if hasattr(model, 'optimizer') and model.optimizer is not None: self.loss = model.loss
if hasattr(model, 'optimizer') and model.optimizer is not None: self.optimizer = model.optimizer
if hasattr(model, 'optimizer') and model.optimizer is not None: self.lr = model.optimizer.lr if self.lr is None else self.lr
if len(self.models) == 0:
self.models = [model]
self.full_model = self.models[-1]
self.embed_full_model = self.models[-1]
else: self.models.append(model)
full_inp = Input(shape=self.models[0].input_shape[1:])
out_orig = self.embed_full_model(full_inp)
out = out_orig
if self.concat_input: out = Concatenate()([out, full_inp])
if len(self.models) > 1:
if K.int_shape(out) != K.int_shape(self.models[-2].input): out = Dense(K.int_shape(self.models[-1].input)[-1])(out)
new_out = self.models[-1](out)
else: new_out = self.models[-1](full_inp)
new_full_out = (self.c0) + (self.boost_rate * new_out)
self.full_model = Model(full_inp, Activation(last_activation)(new_full_out))
if self.encoder_layers is not None:
if len(self.models) > 1: self.embed_full_model = Model(full_inp, Model(self.models[-1].input, self.models[-1].layers[self.encoder_layers].output)(out))
else: self.embed_full_model = Model(full_inp, Model(self.models[-1].input, self.models[-1].layers[self.encoder_layers].output)(full_inp))
else: self.embed_full_model = Model(full_inp, new_out)
def fit(self, x_train, y_train, lr, w_decay=0.0, epochs=10, validation_data = None, **kwargs):
if self.optimizer is None: optimizer = Adam(lr, decay=w_decay)
else: optimizer = deepcopy(self.optimizer)
self.full_model.compile(optimizer, self.loss)
self.full_model.fit(x_train, y_train, epochs=epochs, validation_data = (x_train, y_train), **kwargs)
def predict(self, x_train, **kwargs): return self.full_model.predict(x_train, **kwargs)
class GradientBoost(object):
def __init__(self, base_model, lr = 1e-3, weight_decay = 1e-5, early_stopping_steps = 5, batch_size = 256, correct_epoch = 1, model_order = "second", n_boosting_rounds = 20 , boost_rate = 1.0, hidden_size=512, epochs_per_stage=1, encoder_layers=3):
self.lr = lr
self.base_model = base_model
self.batch_size = batch_size
self.boost_rate = boost_rate
self.model_order = model_order
self.hidden_size = hidden_size
self.weight_decay = weight_decay
self.num_nets = n_boosting_rounds
self.encoder_layers = encoder_layers
self.correct_epoch = correct_epoch
self.epochs_per_stage = epochs_per_stage
self.early_stopping_steps = early_stopping_steps
def fit(self, x_train, y_train, validation_data = None, n_boosting_rounds=None, correct_epoch=None, epochs_per_stage=None, **kwargs):
self.num_nets = n_boosting_rounds if n_boosting_rounds is not None else self.num_nets
self.correct_epoch = correct_epoch if correct_epoch is not None else self.correct_epoch
self.epochs_per_stage = epochs_per_stage if epochs_per_stage is not None else self.epochs_per_stage
x_val , y_val = validation_data if validation_data is not None else (None, None)
net_ensemble = DynamicNet(concat_input=True, encoder_layers=self.encoder_layers)
lr = self.lr
L2 = self.weight_decay
for stage in range(self.num_nets):
params = {}
params["feat_d"] = x_train.shape[1]
params["hidden_size"] = self.hidden_size
new_model = clone_model(self.base_model)
new_model.optimizer = deepcopy(self.base_model.optimizer)
new_model.loss = self.base_model.loss
net_ensemble.freeze_all_networks()
net_ensemble.add(new_model)
net_ensemble.fit(x_train, y_train, epochs=self.epochs_per_stage, lr=self.lr, validation_data = (x_train, y_train), **kwargs)
lr_scaler = 2
if stage != 0:
if stage % 3 == 0: lr /= 2
net_ensemble.unfreeze_all_networks()
net_ensemble.fit(x_train, y_train, epochs=self.correct_epoch, lr=lr / lr_scaler, w_decay=L2, validation_data = (x_train, y_train))
self.model = net_ensemble
def predict(self, x_test, **kwargs): return self.model.predict(x_test, **kwargs)
inp = Input(len(feature_cols))
l = BatchNormalization()(inp)
l = ReLU()(l)
l = Dense(512)(l)
l = BatchNormalization()(l)
l = ReLU()(l)
l = Dropout(0.4)(l)
l = Dense(512)(l)
l = BatchNormalization()(l)
l = ReLU()(l)
out = Dense(206, activation='sigmoid')(l)
simple_model = Model(inp, out)
simple_model.compile(Adam(0.001), 'binary_crossentropy')
model = GradientBoost(model)
model.fit(X, y)
model.predict(X)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment