Skip to content

Instantly share code, notes, and snippets.

@thkim-cochl
Created March 12, 2019 08:51
Show Gist options
  • Save thkim-cochl/a746da8a7274434e2283cc0a0c17eb84 to your computer and use it in GitHub Desktop.
Save thkim-cochl/a746da8a7274434e2283cc0a0c17eb84 to your computer and use it in GitHub Desktop.
tflite test
import numpy as np
import tensorflow as tf
import sox
import pyaudio
import librosa
import logging
import tempfile
import numpy as np
from pydub import AudioSegment
from six.moves import queue
import time
event_list = ['babycry', 'carhorn', 'cough', 'dogbark', 'glassbreak', 'siren', 'snoring']
def postprocessing(response, event_name):
event_idx=event_list.index(event_name)
result=np.array(response)
result=result[:,event_idx]
rounded_result=[round(n,3) for n in result]
rounded_result={'event':event_name,'probability':rounded_result}
rounded_result={'result':[rounded_result]}
return rounded_result
def melspec(x, fs, n_fft=1024, hop_size=512, n_mels=128):
time_range = int(fs/hop_size) # 43 frames ~= 1 sec
n_frame=int(len(x)/(fs/2)-1)
X_test=np.zeros((n_frame,time_range,128,1))
for i in range(n_frame):
i_onset=int(i*fs/2)
i_offset=i_onset+fs
xx=x[i_onset:i_offset]
xx=xx/np.max(np.abs(xx)+np.finfo(float).eps)
S=librosa.feature.melspectrogram(y=xx, sr=fs, n_mels=n_mels, n_fft=n_fft, hop_length=hop_size)
log_S = np.log10(S + np.finfo(float).eps) # log scale
log_S_T = np.transpose(log_S)[:-1]
X_test[i,:,:,0]=log_S_T
return X_test
def predict(data, event_name, fs=22050):
data = np.asarray(data)
data = np.fromstring(data, dtype=np.float32)
data = data[0:fs]
mel_spec = melspec(data, fs=fs, n_fft=1024, hop_size=512, n_mels=128)
# Load TFLite model and allocate tensors.
start_time = time.time()
interpreter1 = tf.contrib.lite.Interpreter(model_path="event1.tflite")
interpreter1.allocate_tensors()
end_time = time.time()
print("TFlite event1 model load time = {}".format(end_time - start_time))
start_time = time.time()
interpreter2 = tf.contrib.lite.Interpreter(model_path="event2.tflite")
interpreter2.allocate_tensors()
end_time = time.time()
print("TFlite event2 model load time = {}".format(end_time - start_time))
# Get input and output tensors.
input1_details = interpreter1.get_input_details()
input2_details = interpreter2.get_input_details()
output1_details = interpreter1.get_output_details()
output2_details = interpreter2.get_output_details()
# Test model on random input data.
input_shape = input1_details[0]['shape']
input_data = mel_spec.astype(np.float32)
# change the following line to feed into your own data.
interpreter1.set_tensor(input1_details[0]['index'], input_data)
interpreter2.set_tensor(input2_details[0]['index'], input_data)
interpreter1.invoke()
interpreter2.invoke()
if event_name == 'glassbreak':
pred = interpreter1.get_tensor(output1_details[0]['index'])
else:
pred1 = interpreter1.get_tensor(output1_details[0]['index'])
pred2 = interpreter2.get_tensor(output2_details[0]['index'])
pred = (pred1 + pred2)/2.0
res = postprocessing(pred, event_name)
return str(res)
class SenseStreamer(object):
def __init__(self, task):
self._rate = 22050
self._chunk = int(self._rate / 2)
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paFloat32,
channels=1, rate=self._rate,
input=True,
frames_per_buffer=self._chunk,
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
while not self.closed:
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b''.join(data)
def sense_stream_request(audio_generator, task, subtask=None):
return ((content, subtask) for content in audio_generator)
def sense_stream_response(requests, task):
_task = task.replace('_stream', '')
streaming_data = []
needed_chunks = 2
for data, subtask in requests:
streaming_data.append(data)
if len(streaming_data) < needed_chunks:
continue
yield predict(streaming_data[-needed_chunks::], subtask)
del streaming_data[:]; del streaming_data
streaming_data = []
if __name__ == "__main__":
task = 'event_stream'
subtask = 'cough'
with SenseStreamer(task) as stream:
audio_generator = stream.generator()
requests = sense_stream_request(audio_generator, task, subtask)
responses = sense_stream_response(requests, task)
for pred in responses:
print(pred)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment