- Wav -> RAW -> MFCC -> SIG
usage : python mkmfc.py
- mkmfcが自動で処理します
usage : python mksig.py
- SIG -> EMD -> HTML
- mksigしてあることが前提です
usage : python mkemd.py [key sig file]
#coding:utf-8 | |
import os | |
import sys | |
import numpy as np | |
import numpy.linalg | |
import rpy2.robjects as robjects | |
from collections import defaultdict | |
reload(sys) | |
sys.setdefaultencoding('utf-8') | |
# mir.py | |
# usage: python mir.py [sig file] [sig dir] [html file] | |
# sig file : クエリ楽曲のシグネチャファイル | |
# sig dir : 検索対象のシグネチャファイルのディレクトリ | |
# html file : 検索結果を出力するHTMLファイル | |
# 引数で指定したシグネチャファイルに近い | |
# 上位N件の楽曲を出力する | |
def KLDiv(mu1, S1, mu2, S2): | |
"""正規分布間のカルバック・ライブラー情報量""" | |
# 逆行列を計算 | |
try: | |
invS1 = np.linalg.inv(S1) | |
except numpy.linalg.linalg.LinAlgError: | |
raise; | |
try: | |
invS2 = np.linalg.inv(S2) | |
except numpy.linalg.linalg.LinAlgError: | |
raise; | |
# KL Divergenceを計算 | |
t1 = np.sum(np.diag(np.dot(invS1, S1))) | |
t2 = (mu2 - mu1).transpose() | |
t3 = mu2 - mu1 | |
return t1 + np.dot(np.dot(t2, invS2), t3) | |
def symKLDiv(mu1, S1, mu2, S2): | |
"""対称性のあるカルバック・ライブラー情報量""" | |
return 0.5 * (KLDiv(mu1, S1, mu2, S2) + KLDiv(mu2, S2, mu1, S1)) | |
robjects.r['library']('lpSolve') | |
transport = robjects.r['lp.transport'] | |
def calcEMD(sigFile1, sigFile2): | |
# シグネチャをロード | |
sig1 = loadSignature(sigFile1) | |
sig2 = loadSignature(sigFile2) | |
# 距離行列を計算 | |
numFeatures = sig1.shape[0] # クラスタの数 | |
dist = np.zeros(numFeatures * numFeatures) # 距離行列(フラット形式) | |
for i in range(numFeatures): | |
mu1 = sig1[i, 1:21].reshape(20, 1) # 縦ベクトル | |
S1 = sig1[i, 21:421].reshape(20, 20) | |
for j in range(numFeatures): | |
mu2 = sig2[j, 1:21].reshape(20, 1) | |
S2 = sig2[j, 21:421].reshape(20, 20) | |
# 特徴量iと特徴量j間のKLダイバージェンスを計算 | |
dist[i * numFeatures + j] = symKLDiv(mu1, S1, mu2, S2) | |
# シグネチャの重み(0列目)を取得 | |
w1 = sig1[:,0] | |
w2 = sig2[:,0] | |
# 重みと距離行列からEMDを計算 | |
# transport()の引数を用意 | |
costs = robjects.r['matrix'](robjects.FloatVector(dist), | |
nrow=len(w1), ncol=len(w2), | |
byrow=True) | |
row_signs = ["<"] * len(w1) | |
row_rhs = robjects.FloatVector(w1) | |
col_signs = [">"] * len(w2) | |
col_rhs = robjects.FloatVector(w2) | |
t = transport(costs, "min", row_signs, row_rhs, col_signs, col_rhs) | |
flow = t.rx2('solution') | |
dist = dist.reshape(len(w1), len(w2)) | |
flow = np.array(flow) | |
work = np.sum(flow * dist) | |
np.seterr(all='print') | |
emd = work / np.sum(flow) | |
return emd | |
def loadSignature(sigFile): | |
"""シグネチャファイルをロード""" | |
mat = [] | |
fp = open(sigFile, "r") | |
for line in fp: | |
line = line.rstrip() | |
mat.append([float(x) for x in line.split()]) | |
fp.close() | |
return np.array(mat) | |
def getArtist(mp3Path): | |
"""MP3ファイルからアーティストを取得""" | |
import eyed3 | |
try: | |
tag = eyed3.Tag() | |
tag.link(mp3Path) | |
artist = tag.getArtist() | |
except: | |
artist = "None" | |
# 空白のとき | |
if artist == "": artist = "None" | |
return artist | |
def makeHTML(ranking, htmlFile, N=10): | |
"""ランキングをHTML形式で出力""" | |
import codecs | |
fout = codecs.open(htmlFile, "w", "utf-8") | |
# HTMLヘッダを出力 | |
fout.write('<!DOCTYPE html>\n') | |
fout.write('<html lang="ja">\n') | |
fout.write('<head><meta charset="UTF-8" /><title>%s</title></head>\n' % htmlFile) | |
fout.write('<body>\n') | |
fout.write('<table border="1">\n') | |
fout.write(u'<thead><tr><th>ランク</th><th>EMD</th><th>タイトル</th>') | |
fout.write(u'<th>アーティスト</th><th>音声</th></tr></thead>\n') | |
fout.write(u'<tbody>\n') | |
# ランキングを出力 | |
rank = 1 | |
for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]: | |
prefix = sigFile.replace(".sig", "") | |
# rawをwavに変換(HTMLプレーヤー用) | |
rawPath = os.path.join("tmp/raw", prefix + ".raw") | |
wavPath = os.path.join("emd", prefix + ".wav") | |
if not os.path.exists("emd"): os.mkdir("emd") | |
os.system('sox -r 16000 -e signed-integer -b 16 "%s" "%s"' % (rawPath, wavPath)) | |
# アーティスト名を取得 | |
mp3Path = os.path.join("mp3", prefix + ".mp3") | |
artist = getArtist(mp3Path) | |
# HTML出力 | |
# HTML5のオーディオプレーヤーを埋め込む | |
audio = '<audio src="%s" controls>' % wavPath | |
fout.write("<tr><td>%d</td><td>%.2f</td><td>%s</td><td>%s</td><td>%s</td></tr>\n" | |
% (rank, emd, prefix, artist, audio)) | |
rank += 1 | |
fout.write("</tbody>\n"); | |
fout.write("</table>\n") | |
fout.write("</body>\n") | |
fout.write("</html>\n") | |
fout.close() | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print "python mir.py [sig file]" | |
sys.exit() | |
targetSigPath = sys.argv[1] | |
sigDir = 'tmp/sig' | |
htmlFile = 'emd.html' | |
ranking = defaultdict(float) | |
# 全楽曲との間で距離を求める | |
for sigFile in os.listdir(sigDir): | |
sigPath = os.path.join(sigDir, sigFile) | |
emd = calcEMD(targetSigPath, sigPath) | |
if emd < 0: continue | |
ranking[sigFile] = emd | |
# ランキングをEMDの降順にソートして出力 | |
N = 50 | |
rank = 1 | |
for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]: | |
print "%d\t%.2f\t%s" % (rank, emd, sigFile) | |
rank += 1 | |
# EMDの昇順に上位10件をHTMLにして出力 | |
makeHTML(ranking, htmlFile, N) |
#coding:utf-8 | |
import os | |
import math | |
import subprocess | |
# mp3_to_mfcc.py | |
# usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir] | |
# ディレクトリ内のMP3ファイルからMFCCを抽出する | |
# if filter == 'HIGH': | |
# os.system('sox -v 0.72 %s %s sinc -100 -t 0.01k' % (cat, pas)) | |
# elif filter == 'MID': | |
# os.system('sox -v 0.72 %s %s sinc 100-1000 -t 0.01k' % (cat, pas)) | |
# elif filter == 'LOW': | |
# os.system('sox -v 0.72 %s %s sinc 1000-10000 -t 0.01k' % (cat, pas)) | |
# else: | |
# os.system('sox -v 0.72 %s %s' % (cat, pas)) | |
#os.system("sox -v 0.72 temp.mp3 temp.wav sinc -100 -t 0.01k") | |
#os.system("sox temp.mp3 temp.wav") | |
period = 15 | |
def getlength(wav): | |
p = subprocess.Popen('soxi -D "%s"' % wav, shell=True, cwd=os.getcwd(), | |
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) | |
return float(p.stdout.readline().rstrip()) | |
def mp3ToRaw(mp3File, wavFile, rawFile): | |
os.system("lame --resample 16 -b 32 -a '%s' temp.mp3" % mp3File) | |
os.system("lame --decode temp.mp3 '%s'" % (wavFile)) | |
wavToRaw(wavFile, rawFile) | |
def wavToRaw(wavFile, rawFile): | |
print "length: %s" % getlength(wavFile) | |
print "period: %s" % (period * 4) | |
print "looped: %s" % int(math.ceil(period * 4 / getlength(wavFile))) | |
os.system('cp "%s" temp.wav' % wavFile) | |
for var in range(1, int(math.ceil(period * 4 / getlength(wavFile)))): | |
print '[%i] temp.wav + "%s"' % (var, wavFile) | |
os.system('sox temp.wav "%s" "%s"' % (wavFile, wavFile)) | |
os.system("sox '%s' '%s'" % (wavFile, rawFile)) | |
#os.remove("temp.mp3") | |
os.remove('temp.wav') | |
# os.remove('temp2.wav') | |
def calcNumSample(rawFile): | |
# 1サンプルはshort型(2byte)なのでファイルサイズを2で割る | |
filesize = os.path.getsize(rawFile) | |
numsample = filesize / 2 | |
return numsample | |
def extractCenter(inFile, outFile): | |
# 波形のサンプル数を求める | |
numsample = calcNumSample(inFile) | |
fs = 16000 | |
center = numsample / 2 | |
start = center - fs * period | |
end = center + fs * period | |
# period*2秒未満の場合は範囲を狭める | |
if start < 0: start = 0 | |
if end > numsample - 1: end = numsample - 1 | |
# SPTKのbcutコマンドで切り出す | |
os.system("bcut +s -s %d -e %d < '%s' > '%s'" % (start, end, inFile, outFile)) | |
def calcMFCC(rawFile, mfccFile): | |
# サンプリング周波数: 16kHz | |
# フレーム長: 400サンプル | |
# シフト幅 : 160サンプル | |
# チャンネル数: 40 | |
# MFCC: 19次元 + エネルギー | |
print 'calculate %s' % mfccFile | |
os.system("x2x +sf < '%s' | frame -l 400 -p 160 | mfcc -a 0.97 -l 400 -f 16 -n 40 -m 19 -E -c 10000 > '%s'" | |
% (rawFile, mfccFile)) | |
if __name__ == "__main__": | |
# if len(sys.argv) != 4: | |
# print "usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir]" | |
# sys.exit() | |
#mp3Dir = 'mp3' | |
wavDir = 'wav' | |
mfcDir = 'tmp/mfc' | |
rawDir = 'tmp/raw' | |
if not os.path.exists('tmp'): | |
os.mkdir('tmp') | |
if not os.path.exists(mfcDir): | |
os.mkdir(mfcDir) | |
if not os.path.exists(rawDir): | |
os.mkdir(rawDir) | |
#for file in os.listdir(mp3Dir): | |
for file in os.listdir(wavDir): | |
if not file.endswith(".wav"): continue | |
# mp3File = os.path.join(mp3Dir, file) | |
wavFile = os.path.join(wavDir, file) | |
mfcFile = os.path.join(mfcDir, file.replace(".wav", ".mfc")) | |
rawFile = os.path.join(rawDir, file.replace(".wav", ".raw")) | |
# elif file.endswith(".wav"): | |
# mp3File = os.path.join(mp3Dir, file) | |
# mfccFile = os.path.join(mfccDir, file.replace(".wav", ".mfc")) | |
# extFile = os.path.join(extDir, file.replace(".wav", ".raw")) | |
# rawFile = os.path.join(rawDir, file.replace(".wav", ".raw")) | |
# wavFile = os.path.join(wavDir, file.replace(".wav", ".wav")) | |
try: | |
# MP3を変換 | |
if not os.path.exists(rawFile): | |
wavToRaw(wavFile, 'temp.raw') | |
# 中央の30秒だけ抽出してrawFileへ | |
extractCenter('temp.raw', rawFile) | |
# MFCCを計算 | |
calcMFCC(rawFile, mfcFile) | |
print "%s => %s" % (wavFile, mfcFile) | |
# 後片付け | |
os.remove("temp.raw") | |
except: | |
continue | |
os.system('python mksig.py') |
#coding:utf-8 | |
import os | |
import struct | |
import numpy as np | |
import scipy.cluster | |
# mfcc_to_signature.py | |
# usage: python mfcc_to_signature.py [mfccdir] [sigdir] | |
# 各曲のMFCCをシグネチャに変換する | |
def loadMFCC(mfccFile, m): | |
"""MFCCをロードする、mはMFCCの次元数""" | |
mfcc = [] | |
fp = open(mfccFile, "rb") | |
while True: | |
b = fp.read(4) | |
if b == "": break | |
val = struct.unpack("f", b)[0] | |
mfcc.append(val) | |
fp.close() | |
# 各行がフレームのMFCC | |
# numFrame行、m列の行列形式に変換 | |
mfcc = np.array(mfcc) | |
numFrame = len(mfcc) / m | |
mfcc = mfcc.reshape(numFrame, m) | |
return mfcc | |
def vq(mfcc, k): | |
"""mfccのベクトル集合をk個のクラスタにベクトル量子化""" | |
codebook, destortion = scipy.cluster.vq.kmeans(mfcc, k) | |
code, dist = scipy.cluster.vq.vq(mfcc, codebook) | |
return code | |
if __name__ == "__main__": | |
# if len(sys.argv) != 3: | |
# print "usage: python mfcc_to_signature.py [mfccdir] [sigdir]" | |
# sys.exit() | |
mfccDir = 'tmp/mfc' | |
sigDir = 'tmp/sig' | |
if not os.path.exists(sigDir): | |
os.mkdir(sigDir) | |
for file in os.listdir(mfccDir): | |
if not file.endswith(".mfc"): continue | |
mfccFile = os.path.join(mfccDir, file) | |
sigFile = os.path.join(sigDir, file.replace(".mfc", ".sig")) | |
if not os.path.exists(sigFile): | |
print mfccFile, "=>", sigFile | |
fout = open(sigFile, "w") | |
# MFCCをロード | |
# 各行がフレームのMFCCベクトル | |
mfcc = loadMFCC(mfccFile, 20) | |
# MFCCをベクトル量子化してコードを求める | |
code = vq(mfcc, 16) | |
# 各クラスタのデータ数、平均ベクトル、 | |
# 共分散行列を求めてシグネチャとする | |
for k in range(16): | |
# クラスタkのフレームのみ抽出 | |
frames = np.array([mfcc[i] for i in range(len(mfcc)) if code[i] == k]) | |
# MFCCの各次元の平均をとって平均ベクトルを求める | |
m = np.apply_along_axis(np.mean, 0, frames) # 0は縦方向 | |
# MFCCの各次元間での分散・共分散行列を求める | |
sigma = np.cov(frames.T) | |
# 重み(各クラスタのデータ数) | |
w = len(frames) | |
# このクラスタの特徴量をフラット形式で出力 | |
# 1行が重み1個、平均ベクトル20個、分散・共分散行列400個の計421個の数値列 | |
features = np.hstack((w, m, sigma.flatten())) | |
features = [str(x) for x in features] | |
fout.write(" ".join(features) + "\n") | |
fout.close() |