Last active
December 10, 2017 12:30
-
-
Save eldrin/e0ed41e608b0c910128a38062e3894d4 to your computer and use it in GitHub Desktop.
Scripts for extration / test of feature from https://github.com/keunwoochoi/transfer_learning_music (for one newly trained w/ Keras 2.X)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from keras.models import Model | |
from keras.layers import GlobalAveragePooling2D as GAP2D | |
from keras.layers import concatenate as concat | |
import keras | |
import kapre | |
import librosa | |
import numpy as np | |
SR = 16000 # [Hz] | |
LEN_SRC = 29. # [sec] | |
ref_n_src = int(SR * LEN_SRC) | |
if keras.__version__[0] != '2': | |
raise RuntimeError('Keras version should be 2.x') | |
def load_model(model_path): | |
""" | |
""" | |
model = keras.models.load_model( | |
model_path, | |
custom_objects={ | |
'Melspectrogram': kapre.time_frequency.Melspectrogram, | |
'Normalization2D': kapre.utils.Normalization2D | |
} | |
) | |
return model | |
def compile_extractor(model): | |
""" | |
""" | |
feat_layer1 = GAP2D()(model.get_layer('elu_1').output) | |
feat_layer2 = GAP2D()(model.get_layer('elu_2').output) | |
feat_layer3 = GAP2D()(model.get_layer('elu_3').output) | |
feat_layer4 = GAP2D()(model.get_layer('elu_4').output) | |
feat_layer5 = GAP2D()(model.get_layer('elu_5').output) | |
feat_all = concat( | |
[feat_layer1, feat_layer2, | |
feat_layer3, feat_layer4, feat_layer5] | |
) | |
feat_extractor = Model(inputs=model.input, outputs=feat_all) | |
return feat_extractor | |
def get_extractor(model_path): | |
""" | |
""" | |
model = load_model(model_path) | |
return compile_extractor(model) | |
def load_audio(audio_path): | |
""" | |
""" | |
src, sr = librosa.load(audio_path, sr=SR, duration=LEN_SRC) | |
len_src = len(src) | |
if len_src < ref_n_src: | |
new_src = np.zeros(ref_n_src) | |
new_src[:len_src] = src | |
return new_src[np.newaxis, np.newaxis, :] | |
else: | |
return src[np.newaxis, np.newaxis, :ref_n_src] | |
def _sample_generator(lines): | |
""" | |
""" | |
for line in lines: | |
audio_path = line.rstrip('\n') | |
yield load_audio(audio_path) | |
def predict_cpu(f_path, model, n_jobs): | |
"""Predict features with multiprocessing | |
path_line: string, path + '\n' | |
model: keras2 extractor model | |
""" | |
paths = f_path.readlines() | |
features = model.predict_generator( | |
_sample_generator(paths), | |
steps=len(paths), | |
workers=1, | |
use_multiprocessing=True, | |
verbose=1 | |
) | |
return features | |
def main(model_path, txt_path, out_path, n_jobs=1): | |
model = get_extractor(model_path) | |
all_features = [] | |
with open(txt_path) as f_path: | |
all_features = predict_cpu(f_path, model, n_jobs) | |
print('Saving all features at {}...'.format(out_path)) | |
np.save(out_path, all_features) | |
print('Done. Saved a numpy array size of (%d, %d)' % all_features.shape) | |
if __name__ == '__main__': | |
""" | |
""" | |
model_path = sys.argv[1] | |
txt_path = sys.argv[2] | |
out_path = sys.argv[3] | |
n_jobs = int(sys.argv[4]) | |
main(model_path, txt_path, out_path, n_jobs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cPickle as pkl | |
import numpy as np | |
from sklearn.model_selection import GridSearchCV, train_test_split | |
from sklearn.svm import SVC, SVR | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.pipeline import Pipeline | |
from sklearn.metrics import accuracy_score, r2_score | |
import fire | |
SVM_PARAMS = [ | |
{ | |
'mdl__C': [0.1, 2.0, 8.0, 32.0], | |
'mdl__gamma': [0.125, 0.03125, 0.0078125, 0.001953125, | |
0.00048828125, 0.0001220703125, 0.00625, | |
'auto'], | |
'mdl__kernel': ['rbf'] | |
}, | |
{ | |
'mdl__C': [0.1, 2., 8., 32.], | |
'mdl__kernel': ['linear'] | |
} | |
] | |
def get_pipe(task='classification'): | |
""" set a task-specific model to build corresponding task | |
""" | |
if task == 'classification': | |
mdl = SVC() | |
scorer = accuracy_score | |
elif task == 'regression': | |
mdl = SVR() | |
scorer = r2_score | |
else: | |
raise ValueError( | |
'[ERROR] {} is not supported (only supports \'classification\' or\ | |
\'regression\' at the moment'.format(task)) | |
# set pipeline | |
pipe = Pipeline( | |
[('sclr', StandardScaler()), | |
('mdl', mdl)] | |
) | |
return pipe, scorer | |
def test(pipe, scorer, X, y, N=1, verbose=False): | |
""" | |
""" | |
global SVM_PARAMS | |
# start test | |
res = [] # result from split | |
res_grd = [] # result from gridsearch | |
for i in range(N): | |
if verbose: | |
print('Start evaluation loop {:d}'.format(i)) | |
# get split | |
Xtr, Xts, ytr, yts = train_test_split( | |
X, y, test_size=0.1) | |
if verbose: | |
print('Start search optimal hyper-param..') | |
grd = GridSearchCV(pipe, SVM_PARAMS, cv=10) | |
grd.fit(Xtr, ytr) | |
if verbose: | |
print('Best Grid Search Result:{:f}'.format(grd.best_score_)) | |
print('Best Parameter:', grd.best_params_) | |
print('Re-train split with best setup...') | |
# fetch best model | |
best = grd.best_estimator_ | |
best.fit(Xtr, ytr) | |
ypr = best.predict(Xts) | |
score = scorer(yts, ypr) | |
# save result | |
res.append(score) | |
res_grd.append(grd.best_score_) | |
return res, res_grd | |
def main(data_path, N=1, task='classification'): | |
""" Test CV given data | |
data_path (str) : path of pickled data. (cPickle) | |
should contain scikit-learn-style data tuple (X, y) | |
N (int) : number of trial. if N=1, then only one CV (with shuffled split) conducted | |
task (str) : flag for task tyle. {'classification', 'regression'} | |
""" | |
# load data | |
X, y = pkl.load(open(data_path)) | |
pipe, scorer = get_pipe(task) | |
res, res_grd = test(pipe, scorer, X, y, N) | |
# print(res, res_grd) | |
print('Avg. Score: {:.4f}'.format(np.mean(res)), | |
'Avg. Score(Grid Search): {:.4f}'.format(np.mean(res_grd))) | |
if __name__ == "__main__": | |
fire.Fire(main) # `fire` is awesome |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment