Created
March 14, 2019 11:17
-
-
Save thkim-cochl/39b760323f21bbf1e219c85562539cd0 to your computer and use it in GitHub Desktop.
Event model TF lite test (async)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import tempfile | |
import numpy as np | |
import tensorflow as tf | |
import sox | |
import pyaudio | |
import librosa | |
import logging | |
import tempfile | |
import numpy as np | |
from pydub import AudioSegment | |
from six.moves import queue | |
import time | |
event_list = ['babycry', 'carhorn', 'cough', 'dogbark', 'glassbreak', 'siren', 'snoring'] | |
def _resample(input_file, file_format, fs, mono=True): | |
ext = file_format | |
tfm = sox.Transformer() | |
tfm.rate(fs, quality='v') | |
tfm.remix(remix_dictionary=None, num_output_channels=1) | |
tfm.build(input_file, 'resampled.'+ext) | |
X = AudioSegment.from_file('resampled.'+ext, ext) | |
os.remove('resampled.'+ext) | |
x = np.asarray(X.get_array_of_samples()).astype('float'+str(8*X.sample_width)) | |
x = x/float(1 << ((8*X.sample_width)-1)) | |
return x | |
def postprocessing(response, event_name): | |
event_idx=event_list.index(event_name) | |
result = np.array(response) | |
result = np.reshape(result, (len(result), 7)) | |
result = result[:,event_idx] | |
rounded_result=[round(n,3) for n in result] | |
rounded_result={'event':event_name,'probability':rounded_result} | |
rounded_result={'result':[rounded_result]} | |
return rounded_result | |
def melspec(x, fs, n_fft=1024, hop_size=512, n_mels=128): | |
time_range = int(fs/hop_size) # 43 frames ~= 1 sec | |
n_frame=int(len(x)/(fs/2)-1) | |
X_test=np.zeros((n_frame,time_range,128,1)) | |
for i in range(n_frame): | |
i_onset=int(i*fs/2) | |
i_offset=i_onset+fs | |
xx=x[i_onset:i_offset] | |
xx=xx/np.max(np.abs(xx)+np.finfo(float).eps) | |
S=librosa.feature.melspectrogram(y=xx, sr=fs, n_mels=n_mels, n_fft=n_fft, hop_length=hop_size) | |
log_S = np.log10(S + np.finfo(float).eps) # log scale | |
log_S_T = np.transpose(log_S)[:-1] | |
X_test[i,:,:,0]=log_S_T | |
return X_test | |
def predict(data, event_name, fs=22050): | |
_, file_format = os.path.splitext(data) | |
file_format = file_format[1:] | |
data = _resample(data, file_format, fs) | |
data = data.astype(np.float) | |
mel_spec = melspec(data, fs=fs, n_fft=1024, hop_size=512, n_mels=128) | |
# Load TFLite model and allocate tensors. | |
interpreter1 = tf.contrib.lite.Interpreter(model_path="event1.tflite") | |
interpreter1.allocate_tensors() | |
interpreter2 = tf.contrib.lite.Interpreter(model_path="event2.tflite") | |
interpreter2.allocate_tensors() | |
# Get input and output tensors. | |
input1_details = interpreter1.get_input_details() | |
input2_details = interpreter2.get_input_details() | |
output1_details = interpreter1.get_output_details() | |
output2_details = interpreter2.get_output_details() | |
input_data = mel_spec | |
# Test model | |
preds = [] | |
for data in input_data: | |
data = np.reshape(data, (1, 43, 128, 1)) | |
data = data.astype(np.float32) | |
interpreter1.set_tensor(input1_details[0]['index'], data) | |
interpreter2.set_tensor(input2_details[0]['index'], data) | |
interpreter1.invoke() | |
interpreter2.invoke() | |
if event_name == 'glassbreak': | |
pred = interpreter1.get_tensor(output1_details[0]['index']) | |
else: | |
pred1 = interpreter1.get_tensor(output1_details[0]['index']) | |
pred2 = interpreter2.get_tensor(output2_details[0]['index']) | |
pred = (pred1 + pred2)/2.0 | |
preds.append(pred) | |
res = postprocessing(preds, event_name) | |
return str(res) | |
def sense_file(filename, task, subtask=None): | |
return predict(filename, subtask) | |
if __name__ == "__main__": | |
task = 'event' | |
for subtask in event_list: | |
print(sense_file('event/{}.wav'.format(subtask), task, subtask)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment