Created
March 12, 2019 08:51
-
-
Save thkim-cochl/a746da8a7274434e2283cc0a0c17eb84 to your computer and use it in GitHub Desktop.
tflite test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
import sox | |
import pyaudio | |
import librosa | |
import logging | |
import tempfile | |
import numpy as np | |
from pydub import AudioSegment | |
from six.moves import queue | |
import time | |
event_list = ['babycry', 'carhorn', 'cough', 'dogbark', 'glassbreak', 'siren', 'snoring'] | |
def postprocessing(response, event_name): | |
event_idx=event_list.index(event_name) | |
result=np.array(response) | |
result=result[:,event_idx] | |
rounded_result=[round(n,3) for n in result] | |
rounded_result={'event':event_name,'probability':rounded_result} | |
rounded_result={'result':[rounded_result]} | |
return rounded_result | |
def melspec(x, fs, n_fft=1024, hop_size=512, n_mels=128): | |
time_range = int(fs/hop_size) # 43 frames ~= 1 sec | |
n_frame=int(len(x)/(fs/2)-1) | |
X_test=np.zeros((n_frame,time_range,128,1)) | |
for i in range(n_frame): | |
i_onset=int(i*fs/2) | |
i_offset=i_onset+fs | |
xx=x[i_onset:i_offset] | |
xx=xx/np.max(np.abs(xx)+np.finfo(float).eps) | |
S=librosa.feature.melspectrogram(y=xx, sr=fs, n_mels=n_mels, n_fft=n_fft, hop_length=hop_size) | |
log_S = np.log10(S + np.finfo(float).eps) # log scale | |
log_S_T = np.transpose(log_S)[:-1] | |
X_test[i,:,:,0]=log_S_T | |
return X_test | |
def predict(data, event_name, fs=22050): | |
data = np.asarray(data) | |
data = np.fromstring(data, dtype=np.float32) | |
data = data[0:fs] | |
mel_spec = melspec(data, fs=fs, n_fft=1024, hop_size=512, n_mels=128) | |
# Load TFLite model and allocate tensors. | |
start_time = time.time() | |
interpreter1 = tf.contrib.lite.Interpreter(model_path="event1.tflite") | |
interpreter1.allocate_tensors() | |
end_time = time.time() | |
print("TFlite event1 model load time = {}".format(end_time - start_time)) | |
start_time = time.time() | |
interpreter2 = tf.contrib.lite.Interpreter(model_path="event2.tflite") | |
interpreter2.allocate_tensors() | |
end_time = time.time() | |
print("TFlite event2 model load time = {}".format(end_time - start_time)) | |
# Get input and output tensors. | |
input1_details = interpreter1.get_input_details() | |
input2_details = interpreter2.get_input_details() | |
output1_details = interpreter1.get_output_details() | |
output2_details = interpreter2.get_output_details() | |
# Test model on random input data. | |
input_shape = input1_details[0]['shape'] | |
input_data = mel_spec.astype(np.float32) | |
# change the following line to feed into your own data. | |
interpreter1.set_tensor(input1_details[0]['index'], input_data) | |
interpreter2.set_tensor(input2_details[0]['index'], input_data) | |
interpreter1.invoke() | |
interpreter2.invoke() | |
if event_name == 'glassbreak': | |
pred = interpreter1.get_tensor(output1_details[0]['index']) | |
else: | |
pred1 = interpreter1.get_tensor(output1_details[0]['index']) | |
pred2 = interpreter2.get_tensor(output2_details[0]['index']) | |
pred = (pred1 + pred2)/2.0 | |
res = postprocessing(pred, event_name) | |
return str(res) | |
class SenseStreamer(object): | |
def __init__(self, task): | |
self._rate = 22050 | |
self._chunk = int(self._rate / 2) | |
self._buff = queue.Queue() | |
self.closed = True | |
def __enter__(self): | |
self._audio_interface = pyaudio.PyAudio() | |
self._audio_stream = self._audio_interface.open( | |
format=pyaudio.paFloat32, | |
channels=1, rate=self._rate, | |
input=True, | |
frames_per_buffer=self._chunk, | |
stream_callback=self._fill_buffer, | |
) | |
self.closed = False | |
return self | |
def __exit__(self, type, value, traceback): | |
self._audio_stream.stop_stream() | |
self._audio_stream.close() | |
self.closed = True | |
self._buff.put(None) | |
self._audio_interface.terminate() | |
def _fill_buffer(self, in_data, frame_count, time_info, status_flags): | |
self._buff.put(in_data) | |
return None, pyaudio.paContinue | |
def generator(self): | |
while not self.closed: | |
chunk = self._buff.get() | |
if chunk is None: | |
return | |
data = [chunk] | |
while True: | |
try: | |
chunk = self._buff.get(block=False) | |
if chunk is None: | |
return | |
data.append(chunk) | |
except queue.Empty: | |
break | |
yield b''.join(data) | |
def sense_stream_request(audio_generator, task, subtask=None): | |
return ((content, subtask) for content in audio_generator) | |
def sense_stream_response(requests, task): | |
_task = task.replace('_stream', '') | |
streaming_data = [] | |
needed_chunks = 2 | |
for data, subtask in requests: | |
streaming_data.append(data) | |
if len(streaming_data) < needed_chunks: | |
continue | |
yield predict(streaming_data[-needed_chunks::], subtask) | |
del streaming_data[:]; del streaming_data | |
streaming_data = [] | |
if __name__ == "__main__": | |
task = 'event_stream' | |
subtask = 'cough' | |
with SenseStreamer(task) as stream: | |
audio_generator = stream.generator() | |
requests = sense_stream_request(audio_generator, task, subtask) | |
responses = sense_stream_response(requests, task) | |
for pred in responses: | |
print(pred) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment