Skip to content

Instantly share code, notes, and snippets.

@eldrin
Last active December 10, 2017 12:30
Show Gist options
  • Save eldrin/e0ed41e608b0c910128a38062e3894d4 to your computer and use it in GitHub Desktop.
Save eldrin/e0ed41e608b0c910128a38062e3894d4 to your computer and use it in GitHub Desktop.
Scripts for extration / test of feature from https://github.com/keunwoochoi/transfer_learning_music (for one newly trained w/ Keras 2.X)
import sys
from keras.models import Model
from keras.layers import GlobalAveragePooling2D as GAP2D
from keras.layers import concatenate as concat
import keras
import kapre
import librosa
import numpy as np
SR = 16000 # [Hz]
LEN_SRC = 29. # [sec]
ref_n_src = int(SR * LEN_SRC)
if keras.__version__[0] != '2':
raise RuntimeError('Keras version should be 2.x')
def load_model(model_path):
"""
"""
model = keras.models.load_model(
model_path,
custom_objects={
'Melspectrogram': kapre.time_frequency.Melspectrogram,
'Normalization2D': kapre.utils.Normalization2D
}
)
return model
def compile_extractor(model):
"""
"""
feat_layer1 = GAP2D()(model.get_layer('elu_1').output)
feat_layer2 = GAP2D()(model.get_layer('elu_2').output)
feat_layer3 = GAP2D()(model.get_layer('elu_3').output)
feat_layer4 = GAP2D()(model.get_layer('elu_4').output)
feat_layer5 = GAP2D()(model.get_layer('elu_5').output)
feat_all = concat(
[feat_layer1, feat_layer2,
feat_layer3, feat_layer4, feat_layer5]
)
feat_extractor = Model(inputs=model.input, outputs=feat_all)
return feat_extractor
def get_extractor(model_path):
"""
"""
model = load_model(model_path)
return compile_extractor(model)
def load_audio(audio_path):
"""
"""
src, sr = librosa.load(audio_path, sr=SR, duration=LEN_SRC)
len_src = len(src)
if len_src < ref_n_src:
new_src = np.zeros(ref_n_src)
new_src[:len_src] = src
return new_src[np.newaxis, np.newaxis, :]
else:
return src[np.newaxis, np.newaxis, :ref_n_src]
def _sample_generator(lines):
"""
"""
for line in lines:
audio_path = line.rstrip('\n')
yield load_audio(audio_path)
def predict_cpu(f_path, model, n_jobs):
"""Predict features with multiprocessing
path_line: string, path + '\n'
model: keras2 extractor model
"""
paths = f_path.readlines()
features = model.predict_generator(
_sample_generator(paths),
steps=len(paths),
workers=1,
use_multiprocessing=True,
verbose=1
)
return features
def main(model_path, txt_path, out_path, n_jobs=1):
model = get_extractor(model_path)
all_features = []
with open(txt_path) as f_path:
all_features = predict_cpu(f_path, model, n_jobs)
print('Saving all features at {}...'.format(out_path))
np.save(out_path, all_features)
print('Done. Saved a numpy array size of (%d, %d)' % all_features.shape)
if __name__ == '__main__':
"""
"""
model_path = sys.argv[1]
txt_path = sys.argv[2]
out_path = sys.argv[3]
n_jobs = int(sys.argv[4])
main(model_path, txt_path, out_path, n_jobs)
import cPickle as pkl
import numpy as np
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.svm import SVC, SVR
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, r2_score
import fire
SVM_PARAMS = [
{
'mdl__C': [0.1, 2.0, 8.0, 32.0],
'mdl__gamma': [0.125, 0.03125, 0.0078125, 0.001953125,
0.00048828125, 0.0001220703125, 0.00625,
'auto'],
'mdl__kernel': ['rbf']
},
{
'mdl__C': [0.1, 2., 8., 32.],
'mdl__kernel': ['linear']
}
]
def get_pipe(task='classification'):
""" set a task-specific model to build corresponding task
"""
if task == 'classification':
mdl = SVC()
scorer = accuracy_score
elif task == 'regression':
mdl = SVR()
scorer = r2_score
else:
raise ValueError(
'[ERROR] {} is not supported (only supports \'classification\' or\
\'regression\' at the moment'.format(task))
# set pipeline
pipe = Pipeline(
[('sclr', StandardScaler()),
('mdl', mdl)]
)
return pipe, scorer
def test(pipe, scorer, X, y, N=1, verbose=False):
"""
"""
global SVM_PARAMS
# start test
res = [] # result from split
res_grd = [] # result from gridsearch
for i in range(N):
if verbose:
print('Start evaluation loop {:d}'.format(i))
# get split
Xtr, Xts, ytr, yts = train_test_split(
X, y, test_size=0.1)
if verbose:
print('Start search optimal hyper-param..')
grd = GridSearchCV(pipe, SVM_PARAMS, cv=10)
grd.fit(Xtr, ytr)
if verbose:
print('Best Grid Search Result:{:f}'.format(grd.best_score_))
print('Best Parameter:', grd.best_params_)
print('Re-train split with best setup...')
# fetch best model
best = grd.best_estimator_
best.fit(Xtr, ytr)
ypr = best.predict(Xts)
score = scorer(yts, ypr)
# save result
res.append(score)
res_grd.append(grd.best_score_)
return res, res_grd
def main(data_path, N=1, task='classification'):
""" Test CV given data
data_path (str) : path of pickled data. (cPickle)
should contain scikit-learn-style data tuple (X, y)
N (int) : number of trial. if N=1, then only one CV (with shuffled split) conducted
task (str) : flag for task tyle. {'classification', 'regression'}
"""
# load data
X, y = pkl.load(open(data_path))
pipe, scorer = get_pipe(task)
res, res_grd = test(pipe, scorer, X, y, N)
# print(res, res_grd)
print('Avg. Score: {:.4f}'.format(np.mean(res)),
'Avg. Score(Grid Search): {:.4f}'.format(np.mean(res_grd)))
if __name__ == "__main__":
fire.Fire(main) # `fire` is awesome
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment