- Wav -> RAW -> MFCC -> SIG
usage : python mkmfc.py
- mkmfcが自動で処理します
usage : python mksig.py
- SIG -> EMD -> HTML
- mksigしてあることが前提です
usage : python mkemd.py [key sig file]
| #coding:utf-8 | |
| import os | |
| import sys | |
| import numpy as np | |
| import numpy.linalg | |
| import rpy2.robjects as robjects | |
| from collections import defaultdict | |
| reload(sys) | |
| sys.setdefaultencoding('utf-8') | |
| # mir.py | |
| # usage: python mir.py [sig file] [sig dir] [html file] | |
| # sig file : クエリ楽曲のシグネチャファイル | |
| # sig dir : 検索対象のシグネチャファイルのディレクトリ | |
| # html file : 検索結果を出力するHTMLファイル | |
| # 引数で指定したシグネチャファイルに近い | |
| # 上位N件の楽曲を出力する | |
| def KLDiv(mu1, S1, mu2, S2): | |
| """正規分布間のカルバック・ライブラー情報量""" | |
| # 逆行列を計算 | |
| try: | |
| invS1 = np.linalg.inv(S1) | |
| except numpy.linalg.linalg.LinAlgError: | |
| raise; | |
| try: | |
| invS2 = np.linalg.inv(S2) | |
| except numpy.linalg.linalg.LinAlgError: | |
| raise; | |
| # KL Divergenceを計算 | |
| t1 = np.sum(np.diag(np.dot(invS1, S1))) | |
| t2 = (mu2 - mu1).transpose() | |
| t3 = mu2 - mu1 | |
| return t1 + np.dot(np.dot(t2, invS2), t3) | |
| def symKLDiv(mu1, S1, mu2, S2): | |
| """対称性のあるカルバック・ライブラー情報量""" | |
| return 0.5 * (KLDiv(mu1, S1, mu2, S2) + KLDiv(mu2, S2, mu1, S1)) | |
| robjects.r['library']('lpSolve') | |
| transport = robjects.r['lp.transport'] | |
| def calcEMD(sigFile1, sigFile2): | |
| # シグネチャをロード | |
| sig1 = loadSignature(sigFile1) | |
| sig2 = loadSignature(sigFile2) | |
| # 距離行列を計算 | |
| numFeatures = sig1.shape[0] # クラスタの数 | |
| dist = np.zeros(numFeatures * numFeatures) # 距離行列(フラット形式) | |
| for i in range(numFeatures): | |
| mu1 = sig1[i, 1:21].reshape(20, 1) # 縦ベクトル | |
| S1 = sig1[i, 21:421].reshape(20, 20) | |
| for j in range(numFeatures): | |
| mu2 = sig2[j, 1:21].reshape(20, 1) | |
| S2 = sig2[j, 21:421].reshape(20, 20) | |
| # 特徴量iと特徴量j間のKLダイバージェンスを計算 | |
| dist[i * numFeatures + j] = symKLDiv(mu1, S1, mu2, S2) | |
| # シグネチャの重み(0列目)を取得 | |
| w1 = sig1[:,0] | |
| w2 = sig2[:,0] | |
| # 重みと距離行列からEMDを計算 | |
| # transport()の引数を用意 | |
| costs = robjects.r['matrix'](robjects.FloatVector(dist), | |
| nrow=len(w1), ncol=len(w2), | |
| byrow=True) | |
| row_signs = ["<"] * len(w1) | |
| row_rhs = robjects.FloatVector(w1) | |
| col_signs = [">"] * len(w2) | |
| col_rhs = robjects.FloatVector(w2) | |
| t = transport(costs, "min", row_signs, row_rhs, col_signs, col_rhs) | |
| flow = t.rx2('solution') | |
| dist = dist.reshape(len(w1), len(w2)) | |
| flow = np.array(flow) | |
| work = np.sum(flow * dist) | |
| np.seterr(all='print') | |
| emd = work / np.sum(flow) | |
| return emd | |
| def loadSignature(sigFile): | |
| """シグネチャファイルをロード""" | |
| mat = [] | |
| fp = open(sigFile, "r") | |
| for line in fp: | |
| line = line.rstrip() | |
| mat.append([float(x) for x in line.split()]) | |
| fp.close() | |
| return np.array(mat) | |
| def getArtist(mp3Path): | |
| """MP3ファイルからアーティストを取得""" | |
| import eyed3 | |
| try: | |
| tag = eyed3.Tag() | |
| tag.link(mp3Path) | |
| artist = tag.getArtist() | |
| except: | |
| artist = "None" | |
| # 空白のとき | |
| if artist == "": artist = "None" | |
| return artist | |
| def makeHTML(ranking, htmlFile, N=10): | |
| """ランキングをHTML形式で出力""" | |
| import codecs | |
| fout = codecs.open(htmlFile, "w", "utf-8") | |
| # HTMLヘッダを出力 | |
| fout.write('<!DOCTYPE html>\n') | |
| fout.write('<html lang="ja">\n') | |
| fout.write('<head><meta charset="UTF-8" /><title>%s</title></head>\n' % htmlFile) | |
| fout.write('<body>\n') | |
| fout.write('<table border="1">\n') | |
| fout.write(u'<thead><tr><th>ランク</th><th>EMD</th><th>タイトル</th>') | |
| fout.write(u'<th>アーティスト</th><th>音声</th></tr></thead>\n') | |
| fout.write(u'<tbody>\n') | |
| # ランキングを出力 | |
| rank = 1 | |
| for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]: | |
| prefix = sigFile.replace(".sig", "") | |
| # rawをwavに変換(HTMLプレーヤー用) | |
| rawPath = os.path.join("tmp/raw", prefix + ".raw") | |
| wavPath = os.path.join("emd", prefix + ".wav") | |
| if not os.path.exists("emd"): os.mkdir("emd") | |
| os.system('sox -r 16000 -e signed-integer -b 16 "%s" "%s"' % (rawPath, wavPath)) | |
| # アーティスト名を取得 | |
| mp3Path = os.path.join("mp3", prefix + ".mp3") | |
| artist = getArtist(mp3Path) | |
| # HTML出力 | |
| # HTML5のオーディオプレーヤーを埋め込む | |
| audio = '<audio src="%s" controls>' % wavPath | |
| fout.write("<tr><td>%d</td><td>%.2f</td><td>%s</td><td>%s</td><td>%s</td></tr>\n" | |
| % (rank, emd, prefix, artist, audio)) | |
| rank += 1 | |
| fout.write("</tbody>\n"); | |
| fout.write("</table>\n") | |
| fout.write("</body>\n") | |
| fout.write("</html>\n") | |
| fout.close() | |
| if __name__ == "__main__": | |
| if len(sys.argv) != 2: | |
| print "python mir.py [sig file]" | |
| sys.exit() | |
| targetSigPath = sys.argv[1] | |
| sigDir = 'tmp/sig' | |
| htmlFile = 'emd.html' | |
| ranking = defaultdict(float) | |
| # 全楽曲との間で距離を求める | |
| for sigFile in os.listdir(sigDir): | |
| sigPath = os.path.join(sigDir, sigFile) | |
| emd = calcEMD(targetSigPath, sigPath) | |
| if emd < 0: continue | |
| ranking[sigFile] = emd | |
| # ランキングをEMDの降順にソートして出力 | |
| N = 50 | |
| rank = 1 | |
| for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]: | |
| print "%d\t%.2f\t%s" % (rank, emd, sigFile) | |
| rank += 1 | |
| # EMDの昇順に上位10件をHTMLにして出力 | |
| makeHTML(ranking, htmlFile, N) |
| #coding:utf-8 | |
| import os | |
| import math | |
| import subprocess | |
| # mp3_to_mfcc.py | |
| # usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir] | |
| # ディレクトリ内のMP3ファイルからMFCCを抽出する | |
| # if filter == 'HIGH': | |
| # os.system('sox -v 0.72 %s %s sinc -100 -t 0.01k' % (cat, pas)) | |
| # elif filter == 'MID': | |
| # os.system('sox -v 0.72 %s %s sinc 100-1000 -t 0.01k' % (cat, pas)) | |
| # elif filter == 'LOW': | |
| # os.system('sox -v 0.72 %s %s sinc 1000-10000 -t 0.01k' % (cat, pas)) | |
| # else: | |
| # os.system('sox -v 0.72 %s %s' % (cat, pas)) | |
| #os.system("sox -v 0.72 temp.mp3 temp.wav sinc -100 -t 0.01k") | |
| #os.system("sox temp.mp3 temp.wav") | |
| period = 15 | |
| def getlength(wav): | |
| p = subprocess.Popen('soxi -D "%s"' % wav, shell=True, cwd=os.getcwd(), | |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) | |
| return float(p.stdout.readline().rstrip()) | |
| def mp3ToRaw(mp3File, wavFile, rawFile): | |
| os.system("lame --resample 16 -b 32 -a '%s' temp.mp3" % mp3File) | |
| os.system("lame --decode temp.mp3 '%s'" % (wavFile)) | |
| wavToRaw(wavFile, rawFile) | |
| def wavToRaw(wavFile, rawFile): | |
| print "length: %s" % getlength(wavFile) | |
| print "period: %s" % (period * 4) | |
| print "looped: %s" % int(math.ceil(period * 4 / getlength(wavFile))) | |
| os.system('cp "%s" temp.wav' % wavFile) | |
| for var in range(1, int(math.ceil(period * 4 / getlength(wavFile)))): | |
| print '[%i] temp.wav + "%s"' % (var, wavFile) | |
| os.system('sox temp.wav "%s" "%s"' % (wavFile, wavFile)) | |
| os.system("sox '%s' '%s'" % (wavFile, rawFile)) | |
| #os.remove("temp.mp3") | |
| os.remove('temp.wav') | |
| # os.remove('temp2.wav') | |
| def calcNumSample(rawFile): | |
| # 1サンプルはshort型(2byte)なのでファイルサイズを2で割る | |
| filesize = os.path.getsize(rawFile) | |
| numsample = filesize / 2 | |
| return numsample | |
| def extractCenter(inFile, outFile): | |
| # 波形のサンプル数を求める | |
| numsample = calcNumSample(inFile) | |
| fs = 16000 | |
| center = numsample / 2 | |
| start = center - fs * period | |
| end = center + fs * period | |
| # period*2秒未満の場合は範囲を狭める | |
| if start < 0: start = 0 | |
| if end > numsample - 1: end = numsample - 1 | |
| # SPTKのbcutコマンドで切り出す | |
| os.system("bcut +s -s %d -e %d < '%s' > '%s'" % (start, end, inFile, outFile)) | |
| def calcMFCC(rawFile, mfccFile): | |
| # サンプリング周波数: 16kHz | |
| # フレーム長: 400サンプル | |
| # シフト幅 : 160サンプル | |
| # チャンネル数: 40 | |
| # MFCC: 19次元 + エネルギー | |
| print 'calculate %s' % mfccFile | |
| os.system("x2x +sf < '%s' | frame -l 400 -p 160 | mfcc -a 0.97 -l 400 -f 16 -n 40 -m 19 -E -c 10000 > '%s'" | |
| % (rawFile, mfccFile)) | |
| if __name__ == "__main__": | |
| # if len(sys.argv) != 4: | |
| # print "usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir]" | |
| # sys.exit() | |
| #mp3Dir = 'mp3' | |
| wavDir = 'wav' | |
| mfcDir = 'tmp/mfc' | |
| rawDir = 'tmp/raw' | |
| if not os.path.exists('tmp'): | |
| os.mkdir('tmp') | |
| if not os.path.exists(mfcDir): | |
| os.mkdir(mfcDir) | |
| if not os.path.exists(rawDir): | |
| os.mkdir(rawDir) | |
| #for file in os.listdir(mp3Dir): | |
| for file in os.listdir(wavDir): | |
| if not file.endswith(".wav"): continue | |
| # mp3File = os.path.join(mp3Dir, file) | |
| wavFile = os.path.join(wavDir, file) | |
| mfcFile = os.path.join(mfcDir, file.replace(".wav", ".mfc")) | |
| rawFile = os.path.join(rawDir, file.replace(".wav", ".raw")) | |
| # elif file.endswith(".wav"): | |
| # mp3File = os.path.join(mp3Dir, file) | |
| # mfccFile = os.path.join(mfccDir, file.replace(".wav", ".mfc")) | |
| # extFile = os.path.join(extDir, file.replace(".wav", ".raw")) | |
| # rawFile = os.path.join(rawDir, file.replace(".wav", ".raw")) | |
| # wavFile = os.path.join(wavDir, file.replace(".wav", ".wav")) | |
| try: | |
| # MP3を変換 | |
| if not os.path.exists(rawFile): | |
| wavToRaw(wavFile, 'temp.raw') | |
| # 中央の30秒だけ抽出してrawFileへ | |
| extractCenter('temp.raw', rawFile) | |
| # MFCCを計算 | |
| calcMFCC(rawFile, mfcFile) | |
| print "%s => %s" % (wavFile, mfcFile) | |
| # 後片付け | |
| os.remove("temp.raw") | |
| except: | |
| continue | |
| os.system('python mksig.py') |
| #coding:utf-8 | |
| import os | |
| import struct | |
| import numpy as np | |
| import scipy.cluster | |
| # mfcc_to_signature.py | |
| # usage: python mfcc_to_signature.py [mfccdir] [sigdir] | |
| # 各曲のMFCCをシグネチャに変換する | |
| def loadMFCC(mfccFile, m): | |
| """MFCCをロードする、mはMFCCの次元数""" | |
| mfcc = [] | |
| fp = open(mfccFile, "rb") | |
| while True: | |
| b = fp.read(4) | |
| if b == "": break | |
| val = struct.unpack("f", b)[0] | |
| mfcc.append(val) | |
| fp.close() | |
| # 各行がフレームのMFCC | |
| # numFrame行、m列の行列形式に変換 | |
| mfcc = np.array(mfcc) | |
| numFrame = len(mfcc) / m | |
| mfcc = mfcc.reshape(numFrame, m) | |
| return mfcc | |
| def vq(mfcc, k): | |
| """mfccのベクトル集合をk個のクラスタにベクトル量子化""" | |
| codebook, destortion = scipy.cluster.vq.kmeans(mfcc, k) | |
| code, dist = scipy.cluster.vq.vq(mfcc, codebook) | |
| return code | |
| if __name__ == "__main__": | |
| # if len(sys.argv) != 3: | |
| # print "usage: python mfcc_to_signature.py [mfccdir] [sigdir]" | |
| # sys.exit() | |
| mfccDir = 'tmp/mfc' | |
| sigDir = 'tmp/sig' | |
| if not os.path.exists(sigDir): | |
| os.mkdir(sigDir) | |
| for file in os.listdir(mfccDir): | |
| if not file.endswith(".mfc"): continue | |
| mfccFile = os.path.join(mfccDir, file) | |
| sigFile = os.path.join(sigDir, file.replace(".mfc", ".sig")) | |
| if not os.path.exists(sigFile): | |
| print mfccFile, "=>", sigFile | |
| fout = open(sigFile, "w") | |
| # MFCCをロード | |
| # 各行がフレームのMFCCベクトル | |
| mfcc = loadMFCC(mfccFile, 20) | |
| # MFCCをベクトル量子化してコードを求める | |
| code = vq(mfcc, 16) | |
| # 各クラスタのデータ数、平均ベクトル、 | |
| # 共分散行列を求めてシグネチャとする | |
| for k in range(16): | |
| # クラスタkのフレームのみ抽出 | |
| frames = np.array([mfcc[i] for i in range(len(mfcc)) if code[i] == k]) | |
| # MFCCの各次元の平均をとって平均ベクトルを求める | |
| m = np.apply_along_axis(np.mean, 0, frames) # 0は縦方向 | |
| # MFCCの各次元間での分散・共分散行列を求める | |
| sigma = np.cov(frames.T) | |
| # 重み(各クラスタのデータ数) | |
| w = len(frames) | |
| # このクラスタの特徴量をフラット形式で出力 | |
| # 1行が重み1個、平均ベクトル20個、分散・共分散行列400個の計421個の数値列 | |
| features = np.hstack((w, m, sigma.flatten())) | |
| features = [str(x) for x in features] | |
| fout.write(" ".join(features) + "\n") | |
| fout.close() |