Skip to content

Instantly share code, notes, and snippets.

@seahrh
Created April 6, 2021 07:05
Show Gist options
  • Save seahrh/225c542dfca5f4cc0e7e08c2a7ac1b10 to your computer and use it in GitHub Desktop.
Save seahrh/225c542dfca5f4cc0e7e08c2a7ac1b10 to your computer and use it in GitHub Desktop.
Create training job on sagemaker
{
"TrainingJobName": "",
"HyperParameters": {
"epochs": "40",
"folds": "5",
"patience": "4",
"batch_size": "32",
"input_shape": "300,300,3",
"pooling": "avg",
"hidden_layer_sizes": "100",
"color_mode": "rgb",
"class_mode": "raw",
"trials": "1"
},
"AlgorithmSpecification": {
"TrainingImage": "<AWS_ACCOUNT_ID>.dkr.ecr.eu-central-1.amazonaws.com/<MY_IMAGE>:1.0",
"TrainingInputMode": "File",
"EnableSageMakerMetricsTimeSeries": false
},
"RoleArn": "<AWS_SERVICE_ROLE>",
"InputDataConfig": [
{
"ChannelName": "training",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "s3://<MY_BUCKET>/data",
"S3DataDistributionType": "FullyReplicated"
}
}
},
{
"ChannelName": "pretrained",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "s3://<MY_BUCKET>/pretrained",
"S3DataDistributionType": "FullyReplicated"
}
}
}
],
"OutputDataConfig": {
"S3OutputPath": "s3://<MY_BUCKET>/models"
},
"ResourceConfig": {
"InstanceType": "ml.g4dn.xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 80
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 86400,
"MaxWaitTimeInSeconds": 86400
},
"EnableNetworkIsolation": true,
"EnableInterContainerTrafficEncryption": true,
"EnableManagedSpotTraining": true
}
import json
import gc
import logging
from typing import List, Dict, Union, Tuple
import numpy as np
import optuna
import pandas as pd
import tensorflow as tf
from sklearn import model_selection
from tensorflow import keras
log = logging.getLogger()
log.setLevel("INFO")
def _model(
pretrained_path: str,
input_shape,
dropout: float,
lr: float,
pooling: str,
hidden_layer_sizes: List[int],
):
pretrained = keras.applications.EfficientNetB3(
include_top=False,
input_shape=input_shape,
pooling=pooling,
weights=pretrained_path,
)
pretrained.trainable = False
kernel_initializer = keras.initializers.he_normal()
kernel_regularizer = keras.regularizers.l2(0.01)
model = keras.models.Sequential()
model.add(pretrained)
for i in range(len(hidden_layer_sizes)):
model.add(keras.layers.LayerNormalization())
model.add(
keras.layers.Dense(
hidden_layer_sizes[i],
activation="relu",
kernel_initializer=kernel_initializer,
kernel_regularizer=kernel_regularizer,
name=f"Dense{i + 1}",
)
)
model.add(keras.layers.Dropout(dropout))
model.add(keras.layers.Dense(1, name="output"))
optimizer = keras.optimizers.Adam(learning_rate=lr)
loss = keras.losses.MeanSquaredLogarithmicError()
rmse = keras.metrics.RootMeanSquaredError()
model.compile(loss=loss, optimizer=optimizer, metrics=[rmse])
return model
def _callbacks(job_dir: str, patience: int):
return [
keras.callbacks.EarlyStopping(monitor="val_loss", patience=patience, verbose=1),
keras.callbacks.TensorBoard(
log_dir=job_dir,
histogram_freq=1,
write_graph=True,
write_images=False,
update_freq="epoch",
profile_batch=0,
embeddings_freq=0,
),
]
class MyObjective:
def __init__(
self,
df,
x_col,
y_col,
splitter,
data_dir,
job_dir,
pretrained_path,
batch_size,
input_shape,
class_mode,
color_mode,
patience,
epochs,
pooling,
hidden_layer_sizes,
groups=None,
):
self.df = df
self.x_col = x_col
self.y_col = y_col
self.splitter = splitter
self.data_dir = data_dir
self.job_dir = job_dir
self.pretrained_path = pretrained_path
self.batch_size = batch_size
self.input_shape = input_shape
self.target_size = (input_shape[0], input_shape[1])
self.color_mode = color_mode
self.class_mode = class_mode
self.patience = patience
self.epochs = epochs
self.pooling = pooling
self.hidden_layer_sizes = hidden_layer_sizes
self.groups = groups
self.model_history = None
self.history: List[Dict[str, Union[str, int, float]]] = []
def __call__(self, trial):
trial_id = trial.number
hist = {
"trial_id": trial_id,
"dropout": trial.suggest_uniform("dropout", 0.0001, 0.0001),
"learning_rate": trial.suggest_loguniform("learning_rate", 1e-3, 1e-3),
}
scores = []
idg = keras.preprocessing.image.ImageDataGenerator()
for fold, (ti, vi) in enumerate(
self.splitter.split(self.df, groups=self.groups)
):
model = _model(
pretrained_path=self.pretrained_path,
input_shape=self.input_shape,
dropout=hist["dropout"],
lr=hist["learning_rate"],
pooling=self.pooling,
hidden_layer_sizes=self.hidden_layer_sizes,
)
train_gen = idg.flow_from_dataframe(
self.df.iloc[ti],
x_col=self.x_col,
y_col=self.y_col,
directory=self.data_dir,
target_size=self.target_size,
color_mode=self.color_mode,
batch_size=self.batch_size,
class_mode=self.class_mode,
shuffle=True,
)
val_gen = idg.flow_from_dataframe(
self.df.iloc[vi],
x_col=self.x_col,
y_col=self.y_col,
directory=self.data_dir,
target_size=self.target_size,
color_mode=self.color_mode,
batch_size=self.batch_size,
class_mode=self.class_mode,
shuffle=False,
)
history = model.fit(
train_gen,
epochs=self.epochs,
validation_data=val_gen,
callbacks=_callbacks(
job_dir=f"{self.job_dir}/trial_{trial_id}/fold_{fold}",
patience=self.patience,
),
)
df = pd.DataFrame(history.history)
score = float(df["val_root_mean_squared_error"].iloc[0])
log.info(f"score={score:.4f}, fold={fold}, trial={trial_id}")
hist[f"fold_{fold}_score"] = score
scores.append(score)
del model
gc.collect()
hist["score_mean"] = np.mean(scores)
hist["score_std"] = np.std(scores)
hist["score_worst"] = max(scores)
self.history.append(hist)
return hist["score_worst"]
def _args() -> Dict[str, Union[str, float, int]]:
res: Dict[str, Union[str, float, int]] = {}
with open("/opt/ml/input/config/hyperparameters.json", "r") as f:
res = json.load(f)
res["epochs"] = int(res["epochs"])
res["folds"] = int(res["folds"])
res["patience"] = int(res["patience"])
res["batch_size"] = int(res["batch_size"])
res["trials"] = int(res["trials"])
return res
def _main():
gpus = tf.config.experimental.list_physical_devices("GPU")
log.info(f"gpus={gpus}")
if len(gpus) == 0:
raise RuntimeError("Expecting at least one gpu but found none.")
args: Dict[str, Union[str, float, int]] = _args()
input_shape_list = [int(x) for x in args["input_shape"].split(",")]
input_shape: Tuple[int, int, int] = (
input_shape_list[0],
input_shape_list[1],
input_shape_list[2],
)
hidden_layer_sizes: List[int] = [
int(x) for x in args["hidden_layer_sizes"].split(",")
]
data_dir = "/opt/ml/input/data/training"
pretrained_path = "/opt/ml/input/data/pretrained/efficientnetb3_notop.h5"
job_dir = "/opt/ml/model"
train_df = pd.read_parquet(f"{data_dir}/train.parquet")
log.info("train.parquet loaded")
obj = MyObjective(
train_df,
x_col="filepath",
y_col="maf",
splitter=model_selection.GroupKFold(n_splits=args["folds"]),
data_dir=f"{data_dir}/mels",
job_dir=job_dir,
pretrained_path=pretrained_path,
batch_size=args["batch_size"],
input_shape=input_shape,
patience=args["patience"],
class_mode=args["class_mode"],
color_mode=args["color_mode"],
epochs=args["epochs"],
pooling=args["pooling"],
hidden_layer_sizes=hidden_layer_sizes,
groups=train_df["senzit_id"].to_numpy(),
)
study = optuna.create_study(direction="minimize")
study.optimize(obj, n_trials=args["trials"])
history = pd.DataFrame.from_records(obj.history)
history.sort_values("score_worst", ascending=True, inplace=True, ignore_index=True)
history.to_csv(f"{job_dir}/cv.csv", index=False)
best = history.iloc[0]
log.info(f"best={repr(best)}")
model = _model(
pretrained_path=pretrained_path,
input_shape=input_shape,
dropout=best["dropout"],
lr=best["learning_rate"],
pooling=args["pooling"],
hidden_layer_sizes=hidden_layer_sizes,
)
model.summary()
idg = keras.preprocessing.image.ImageDataGenerator()
train_gen = idg.flow_from_dataframe(
train_df,
x_col="filepath",
y_col="maf",
directory=f"{data_dir}/mels",
target_size=(input_shape[0], input_shape[1]),
color_mode=args["color_mode"],
batch_size=args["batch_size"],
class_mode=args["class_mode"],
shuffle=True,
)
history = model.fit(
train_gen,
epochs=args["epochs"],
callbacks=[
keras.callbacks.TensorBoard(
log_dir=job_dir,
histogram_freq=1,
write_graph=True,
write_images=False,
update_freq="epoch",
profile_batch=0,
embeddings_freq=0,
),
],
)
model.save(f"{job_dir}/model.h5")
df = pd.DataFrame(history.history)
df["epoch"] = history.epoch
df.to_csv(f"{job_dir}/history.csv", index=False)
log.info(f"Done! job_dir={job_dir}")
if __name__ == "__main__":
_main()
#!/usr/bin/env bash
set -x #echo on
DATE=$(date '+%Y%m%d-%H%M%S')
JOB_NAME="${MODEL_NAME}-${DATE}"
aws sagemaker create-training-job --training-job-name "${JOB_NAME}" \
--cli-input-json "file://aws/sandbox/create_training_job.json"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment