Created
March 21, 2013 00:32
-
-
Save avaitla/5209781 to your computer and use it in GitHub Desktop.
Here is a simple setup that does command recognition and training. Depends on pyaudio, numpy, scipy. Motivated here http://xa.yimg.com/kq/groups/24321415/1523383180/name/Speech_Recognition_seminar.pdf MFCC.py is the mfcc extraction step. audio.py is the main classification system. Main.py is what you want to run on the command line, it does trai…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import MFCC, numpy | |
import scipy.spatial.distance as dist | |
THRESHOLD = 12 | |
class AudioClassifier (): | |
def __init__ (self, params): | |
self.params = params | |
def Classify (self, sample, verbose = True): | |
length = len (sample) | |
features = MFCC.extract (numpy.frombuffer (sample, numpy.int16)) | |
gestures = {} | |
for gesture in self.params: | |
d = [] | |
for tsample in self.params[gesture]: | |
total_distance = 0 | |
smpl_length = len(tsample) | |
if(numpy.abs(length - smpl_length) <= 0): | |
continue | |
for i in range (min (len (features), len (tsample))): | |
total_distance += dist.cityblock(features[i], tsample[i]) | |
d.append (total_distance/float (i)) | |
score = numpy.min(d) | |
gestures[gesture] = score | |
if(verbose): | |
print "Gesture %s: %f" % (gesture, score) | |
try: | |
if (score < minimum): | |
minimum = score | |
lowest = gesture | |
except: | |
minimum = score | |
lowest = gesture | |
if verbose: | |
print lowest, minimum | |
if(minimum < THRESHOLD): | |
return lowest | |
else: | |
return None | |
def GenerateParams (gestures, verbose = True): | |
params = {} | |
for gesture in gestures: | |
if(verbose): | |
print "Processing " + gesture | |
l = [] | |
for sample in gestures[gesture]: | |
l.append (MFCC.extract (numpy.frombuffer (sample, numpy.int16))) | |
params[gesture] = l | |
return params | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import pyaudio | |
import wave | |
import audioop | |
import cPickle | |
import audio | |
import pdb | |
import os | |
import sys | |
import socket | |
from network import HOST, PORT | |
CHUNK = 1024 | |
FORMAT = pyaudio.paInt16 | |
CHANNELS = 2 | |
RATE = 16000 | |
RECORD_SECONDS = 5 | |
THRESHOLD = 1000 | |
try: | |
username = sys.argv[1] | |
except: | |
username = "default" | |
try: | |
method = sys.argv[2] | |
except: | |
method = "default" | |
verbose = True | |
p = pyaudio.PyAudio() | |
def StreamBuild(channels): | |
return p.open(format=FORMAT, channels = CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) | |
stream = "" | |
try: | |
stream = StreamBuild(CHANNELS) | |
except IOError as e: | |
print "Found Exception: " + str(e) +". Trying 1 Channel"; | |
CHANNELS = 1 | |
stream = StreamBuild(CHANNELS) | |
out_stream = p.open(format = FORMAT, channels = CHANNELS, rate = RATE, | |
output = True) | |
def main (user="default", method="default"): | |
global db, username | |
try: | |
db = cPickle.load (open ("database", "rb")) | |
except: | |
db = {} | |
if (method == "live"): | |
run_classify (user, False) | |
if (user == "default"): | |
username = raw_input ("Username: ") | |
if (not db.has_key (username)): | |
db[username] = {"gestures":{}, "trained":False, "no_class_audio" : []} | |
else: | |
username = user | |
if (not db.has_key (username)): | |
db[username] = {"gestures":{}, "trained":False, "no_class_audio" : []} | |
gestlist = "" | |
for gesture in db[username]["gestures"].keys(): | |
gestlist += (gesture + ", ") | |
print ("Welcome %s, you have loaded the following gestures: %s" % | |
(username, gestlist.strip(", "))) | |
if db[username]["trained"]: | |
print "The system has been trained since you last recorded." | |
else: | |
print "The system has not been trained since you last recorded." | |
if (method == "default"): | |
print "Menu: \t1) Record Gestures" | |
print "\t2) Train System" | |
print "\t3) Run Classifier" | |
print "\t4) Listen to Gestures" | |
print "\t5) Save Audio to File" | |
print "\t6) View Stats for User" | |
print "\t7) Run Testing Module" | |
print "\t8) Quit" | |
method = input ("Choose option: ") | |
print "" | |
ret = 1 | |
if (method == 1): | |
printv ("Starting recording system") | |
run_record (username) | |
printv ("Finished recording system") | |
elif (method == 2): | |
printv ("Starting training system") | |
run_train (username) | |
printv ("Finished training system") | |
elif (method == 3): | |
printv ("Starting classification system") | |
run_classify (username) | |
printv ("Finished classification system") | |
elif (method == 4): | |
printv ("Starting playback system") | |
run_play (username) | |
printv ("Finished playback system") | |
elif (method == 5): | |
printv("Saving Audio To Files") | |
run_save_audio(username) | |
printv("Finished Saving Audio") | |
elif(method == 6): | |
printv("Viewing User Info") | |
run_view_avail_gestures(username) | |
printv("Finished Viewing User Info") | |
elif(method == 7): | |
printv("Starting Test Sequence") | |
run_test_sequence(username) | |
printv("Ending Test Sequence") | |
else: | |
ret = 0 | |
f = open ("database", "wb") | |
cPickle.dump (db, f) | |
f.close () | |
print "------------------------------------------------------------" | |
return (ret) | |
def run_record (username): | |
global db | |
gesturename = raw_input ("Gesture name: ") | |
if (gesturename[0] == '-'): | |
db[username]["gestures"].pop(gesturename[1:]) | |
return | |
if (not db[username]["gestures"].has_key (gesturename)): | |
db[username]["gestures"][gesturename] = [] | |
db[username]["trained"] = False | |
Run_On_Every_Frame(lambda frames : db[username]["gestures"][gesturename].append(frames)) | |
for gesture in db[username]["gestures"][gesturename]: | |
playv(gesture) | |
def run_train (username): | |
params = audio.GenerateParams(db[username]["gestures"], verbose) | |
db[username]["params"] = params | |
db[username]["trained"] = True | |
def run_classify (username, verbose = True): | |
frames = "" | |
classifier = audio.AudioClassifier (db[username]["params"]) | |
Run_On_Every_Frame(lambda frames : SendString (classifier.Classify(frames, verbose))) | |
def run_play (username): | |
gesturename = raw_input ("Gesture name: ") | |
if (not db[username]["gestures"].has_key (gesturename)): | |
print "No such gesture found for user: "+username | |
return | |
for gesture in db[username]["gestures"][gesturename]: | |
out_stream.write (gesture) | |
return | |
def run_save_audio(username): | |
if(os.path.isdir(username)): | |
ret = raw_input("Directory %s Exists. Would you like to delete it? (y,n)" % username) | |
if(ret == "n"): return | |
import shutil | |
shutil.rmtree(username) | |
os.mkdir(username) | |
import scipy.io.wavfile | |
import numpy | |
for gesture in db[username]["gestures"]: | |
for i, audio in enumerate(db[username]["gestures"][gesture]): | |
path = os.path.join(username, gesture + "_" + str(i) + ".wav") | |
scipy.io.wavfile.write(path, RATE, numpy.frombuffer(audio, numpy.int16)) | |
def run_view_avail_gestures(username): | |
for i,gesture in enumerate(db[username]["gestures"]): | |
print "\t" + str(i) + " : " + gesture | |
print "" | |
def run_test_sequence(username): | |
val = raw_input("Record unclassifiable audio samples? Currently have %s samples. (y,n): " | |
% len(db[username]["no_class_audio"])) | |
if(val == "y"): | |
Run_On_Every_Frame(lambda frames : db[username]["no_class_audio"].append(frames)) | |
trainData = {} | |
testData = {} | |
import random | |
for gesture in db[username]["gestures"]: | |
train = True | |
for smpl in db[username]["gestures"][gesture]: | |
if(train): | |
trainData.setdefault(gesture, []).append(smpl) | |
else: | |
testData.setdefault(gesture, []).append(smpl) | |
train = not train | |
aClassifier = audio.AudioClassifier(audio.GenerateParams(trainData, False)) | |
correctPredictions = {} | |
incorrectPredictions = {} | |
print "" | |
for gesture in testData: | |
for i, sample in enumerate(testData[gesture]): | |
aClassification = aClassifier.Classify(sample, False) | |
# Incorrect Classification | |
if(aClassification != gesture): | |
printv("FAIL : Incorrectly Classified %s as being %s" % (gesture, aClassification)) | |
playv(sample) | |
incorrectPredictions.setdefault(gesture, {}).setdefault(aClassification, []).append(i) | |
#Correct Classification | |
else: | |
printv("SUCCESS : Correctly Classified %s as being %s" % (gesture, gesture)) | |
correctPredictions.setdefault(gesture, []).append(i) | |
correctNoClass = [] | |
incorrectNoClass = {} | |
for i, no_class_audio in enumerate(db[username]["no_class_audio"]): | |
aClassification = aClassifier.Classify(no_class_audio, False) | |
# Incorrect No Class Classification | |
if(aClassification != None): | |
incorrectNoClass.setdefault(aClassification, []).append(i) | |
printv("FAIL : Gave No Class Audio Label %s" % aClassification) | |
playv(no_class_audio) | |
# Correct No Class Assignment | |
else: | |
printv("SUCCESS : Gave Correct No Class Assignment") | |
correctNoClass.append(i) | |
view_test_results(correctPredictions, incorrectPredictions, correctNoClass, | |
incorrectNoClass, testData, db[username]["no_class_audio"]) | |
def view_test_results(correctPredictions, incorrectPredictions, correctNoClass, | |
incorrectNoClass, testData, noClassTests): | |
def printPercentages(numCorrect, total): | |
if(total == 0): | |
print "No Samples\n" | |
return | |
percentCorrect = int(100 * float(numCorrect) / total) | |
percentIncorrect = 100 - percentCorrect | |
printv("\tSUCCESS (%s%%)\t: %s" % (str(percentCorrect), "*" * (percentCorrect / 10))) | |
printv("\tFAIL (%s%%)\t: %s" % (str(percentIncorrect), "*" * (percentIncorrect / 10))) | |
total_correct = 0 | |
total = 0 | |
for item in testData.keys(): | |
printv("\nGesture %s" % item) | |
total_correct = 0 | |
if(item in correctPredictions): | |
total_correct = len(correctPredictions[item]) | |
total = len(testData[item]) | |
printPercentages(total_correct, total) | |
for key in testData.keys(): | |
if(key in correctPredictions): | |
total_correct += len(correctPredictions[key]) | |
total += len(testData[key]) | |
printv("\n\nTotals for Classification with Labels") | |
printPercentages(total_correct, total) | |
total_correct = len(correctNoClass) | |
total = float(len(noClassTests)) | |
printv("\nTotals for Classification with No Labels") | |
printPercentages(total_correct, total) | |
printv("") | |
def Run_On_Every_Frame(execute): | |
frames = "" | |
try: | |
while (True): | |
data = stream.read (CHUNK) | |
amplitude = audioop.rms (data, 2) | |
if (amplitude >= THRESHOLD): | |
#printv("Amplitude: " + str(amplitude)) | |
frames += data | |
elif (len (frames) > 0): | |
execute(frames) | |
frames = "" | |
except KeyboardInterrupt: | |
return | |
def SendString (string): | |
if (string != None): | |
client_socket = socket.socket (socket.AF_INET, socket.SOCK_DGRAM) | |
print username+"_"+string | |
client_socket.sendto (username+"_"+string, (HOST, PORT)) | |
client_socket.close () | |
def printv (string): | |
if (verbose): | |
print string | |
def playv(audio): | |
if (verbose): | |
out_stream.write(audio) | |
try: | |
ret = 1 | |
while(ret != 0): | |
ret = main (username, method) | |
except KeyboardInterrupt: | |
raise | |
finally: | |
try: | |
out_stream.stop_stream() | |
out_stream.close() | |
stream.stop_stream() | |
stream.close() | |
p.terminate() | |
except Exception as e: | |
print str(e) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################################### | |
# Module for MFCC extraction | |
# By Maigo Yun Wang, 02/08/2012 | |
############################################################################### | |
# Quick tutorial: | |
# import MFCC | |
# x = ... # x is a wave signal saved in a 1-D numpy array | |
# mfcc = MFCC.extract(x) | |
# # mfcc is a 2-D numpy array, where each row is the | |
# # MFCC of a frame in x | |
# mfcc = MFCC.extract(x, show = True) | |
# # This will also plot the MFCC and the spectrogram | |
# # reconstructed from MFCC by inverse DCT | |
############################################################################### | |
# Feel free to customize the parameters on Lines 67 - 79. | |
############################################################################### | |
# | |
# Modded to perform normalization before extraction | |
# | |
################################################################################ | |
from numpy import * | |
from numpy.linalg import * | |
from matplotlib.pyplot import * | |
def hamming(n): | |
""" | |
Generate a hamming window of n points as a numpy array. | |
""" | |
return 0.54 - 0.46 * cos(2 * pi / n * (arange(n) + 0.5)) | |
def melfb(p, n, fs): | |
""" | |
Return a Mel filterbank matrix as a numpy array. | |
Inputs: | |
p: number of filters in the filterbank | |
n: length of fft | |
fs: sample rate in Hz | |
Ref. http://www.ifp.illinois.edu/~minhdo/teaching/speaker_recognition/code/melfb.m | |
""" | |
f0 = 700.0 / fs | |
fn2 = int(floor(n/2)) | |
lr = log(1 + 0.5/f0) / (p+1) | |
CF = fs * f0 * (exp(arange(1, p+1) * lr) - 1) | |
bl = n * f0 * (exp(array([0, 1, p, p+1]) * lr) - 1) | |
b1 = int(floor(bl[0])) + 1 | |
b2 = int(ceil(bl[1])) | |
b3 = int(floor(bl[2])) | |
b4 = min(fn2, int(ceil(bl[3]))) - 1 | |
pf = log(1 + arange(b1,b4+1) / f0 / n) / lr | |
fp = floor(pf) | |
pm = pf - fp | |
M = zeros((p, 1+fn2)) | |
for c in range(b2-1,b4): | |
r = fp[c] - 1 | |
M[r,c+1] += 2 * (1 - pm[c]) | |
for c in range(b3): | |
r = fp[c] | |
M[r,c+1] += 2 * pm[c] | |
return M, CF | |
def dctmtx(n): | |
""" | |
Return the DCT-II matrix of order n as a numpy array. | |
""" | |
x,y = meshgrid(range(n), range(n)) | |
D = sqrt(2.0/n) * cos(pi * (2*x+1) * y / (2*n)) | |
D[0] /= sqrt(2) | |
return D | |
FS = 16000 # Sampling rate | |
FRAME_LEN = int(0.02 * FS) # Frame length | |
FRAME_SHIFT = int(0.01 * FS) # Frame shift | |
FFT_SIZE = 2048 # How many points for FFT | |
WINDOW = hamming(FRAME_LEN) # Window function | |
PRE_EMPH = 0.95 # Pre-emphasis factor | |
BANDS = 40 # Number of Mel filters | |
COEFS = 13 # Number of Mel cepstra coefficients to keep | |
POWER_SPECTRUM_FLOOR = 1e-100 # Flooring for the power to avoid log(0) | |
M, CF = melfb(BANDS, FFT_SIZE, FS) # The Mel filterbank matrix and the center frequencies of each band | |
D = dctmtx(BANDS)[1:COEFS+1] # The DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th coefficient | |
invD = inv(dctmtx(BANDS))[:,1:COEFS+1] # The inverse DCT matrix. Change the index to [0:COEFS] if you want to keep the 0-th coefficient | |
def extract(x, show = False): | |
""" | |
Extract MFCC coefficients of the sound x in numpy array format. | |
""" | |
if x.ndim > 1: | |
print "INFO: Input signal has more than 1 channel; the channels will be averaged." | |
x = mean(x, axis=1) | |
# Normalize the Sequence First | |
#total = 0.0 | |
#for i in x: total += i**2 | |
#total = sqrt(total / len(x)) | |
#x = x / total | |
frames = (len(x) - FRAME_LEN) / FRAME_SHIFT + 1 | |
feature = [] | |
for f in range(frames): | |
# Windowing | |
frame = x[f * FRAME_SHIFT : f * FRAME_SHIFT + FRAME_LEN] * WINDOW | |
# Pre-emphasis | |
frame[1:] -= frame[:-1] * PRE_EMPH | |
# Power spectrum | |
X = abs(fft.fft(frame, FFT_SIZE)[:FFT_SIZE/2+1]) ** 2 | |
X[X < POWER_SPECTRUM_FLOOR] = POWER_SPECTRUM_FLOOR # Avoid zero | |
# Mel filtering, logarithm, DCT | |
X = dot(D, log(dot(M,X))) | |
feature.append(X) | |
feature = row_stack(feature) | |
# Show the MFCC spectrum before normalization | |
if show: | |
figure().show() | |
subplot(2,1,2) | |
show_MFCC_spectrum(feature) | |
# Mean & variance normalization | |
if feature.shape[0] > 1: | |
mu = mean(feature, axis=0) | |
sigma = std(feature, axis=0) | |
feature = (feature - mu) / sigma | |
# Show the MFCC | |
subplot(2,1,1) | |
show_MFCC(feature) | |
draw() | |
return feature | |
def show_MFCC(mfcc): | |
""" | |
Show the MFCC as an image. | |
""" | |
imshow(mfcc.T, aspect="auto", interpolation="none") | |
title("MFCC features") | |
xlabel("Frame") | |
ylabel("Dimension") | |
def show_MFCC_spectrum(mfcc): | |
""" | |
Show the spectrum reconstructed from MFCC as an image. | |
""" | |
imshow(dot(invD, mfcc.T), aspect="auto", interpolation="none", origin="lower") | |
title("MFCC spectrum") | |
xlabel("Frame") | |
ylabel("Band") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment