Skip to content

Instantly share code, notes, and snippets.

@geta6
Created November 15, 2012 17:57
Show Gist options
  • Save geta6/4080110 to your computer and use it in GitHub Desktop.
Save geta6/4080110 to your computer and use it in GitHub Desktop.

How To Use

mkmfc.py

  • Wav -> RAW -> MFCC -> SIG
usage : python mkmfc.py

mksig.py

  • mkmfcが自動で処理します
usage : python mksig.py

mkemd.py

  • SIG -> EMD -> HTML
    • mksigしてあることが前提です
usage : python mkemd.py [key sig file]
#coding:utf-8
import os
import sys
import numpy as np
import numpy.linalg
import rpy2.robjects as robjects
from collections import defaultdict
reload(sys)
sys.setdefaultencoding('utf-8')
# mir.py
# usage: python mir.py [sig file] [sig dir] [html file]
# sig file : クエリ楽曲のシグネチャファイル
# sig dir : 検索対象のシグネチャファイルのディレクトリ
# html file : 検索結果を出力するHTMLファイル
# 引数で指定したシグネチャファイルに近い
# 上位N件の楽曲を出力する
def KLDiv(mu1, S1, mu2, S2):
"""正規分布間のカルバック・ライブラー情報量"""
# 逆行列を計算
try:
invS1 = np.linalg.inv(S1)
except numpy.linalg.linalg.LinAlgError:
raise;
try:
invS2 = np.linalg.inv(S2)
except numpy.linalg.linalg.LinAlgError:
raise;
# KL Divergenceを計算
t1 = np.sum(np.diag(np.dot(invS1, S1)))
t2 = (mu2 - mu1).transpose()
t3 = mu2 - mu1
return t1 + np.dot(np.dot(t2, invS2), t3)
def symKLDiv(mu1, S1, mu2, S2):
"""対称性のあるカルバック・ライブラー情報量"""
return 0.5 * (KLDiv(mu1, S1, mu2, S2) + KLDiv(mu2, S2, mu1, S1))
robjects.r['library']('lpSolve')
transport = robjects.r['lp.transport']
def calcEMD(sigFile1, sigFile2):
# シグネチャをロード
sig1 = loadSignature(sigFile1)
sig2 = loadSignature(sigFile2)
# 距離行列を計算
numFeatures = sig1.shape[0] # クラスタの数
dist = np.zeros(numFeatures * numFeatures) # 距離行列(フラット形式)
for i in range(numFeatures):
mu1 = sig1[i, 1:21].reshape(20, 1) # 縦ベクトル
S1 = sig1[i, 21:421].reshape(20, 20)
for j in range(numFeatures):
mu2 = sig2[j, 1:21].reshape(20, 1)
S2 = sig2[j, 21:421].reshape(20, 20)
# 特徴量iと特徴量j間のKLダイバージェンスを計算
dist[i * numFeatures + j] = symKLDiv(mu1, S1, mu2, S2)
# シグネチャの重み(0列目)を取得
w1 = sig1[:,0]
w2 = sig2[:,0]
# 重みと距離行列からEMDを計算
# transport()の引数を用意
costs = robjects.r['matrix'](robjects.FloatVector(dist),
nrow=len(w1), ncol=len(w2),
byrow=True)
row_signs = ["<"] * len(w1)
row_rhs = robjects.FloatVector(w1)
col_signs = [">"] * len(w2)
col_rhs = robjects.FloatVector(w2)
t = transport(costs, "min", row_signs, row_rhs, col_signs, col_rhs)
flow = t.rx2('solution')
dist = dist.reshape(len(w1), len(w2))
flow = np.array(flow)
work = np.sum(flow * dist)
np.seterr(all='print')
emd = work / np.sum(flow)
return emd
def loadSignature(sigFile):
"""シグネチャファイルをロード"""
mat = []
fp = open(sigFile, "r")
for line in fp:
line = line.rstrip()
mat.append([float(x) for x in line.split()])
fp.close()
return np.array(mat)
def getArtist(mp3Path):
"""MP3ファイルからアーティストを取得"""
import eyed3
try:
tag = eyed3.Tag()
tag.link(mp3Path)
artist = tag.getArtist()
except:
artist = "None"
# 空白のとき
if artist == "": artist = "None"
return artist
def makeHTML(ranking, htmlFile, N=10):
"""ランキングをHTML形式で出力"""
import codecs
fout = codecs.open(htmlFile, "w", "utf-8")
# HTMLヘッダを出力
fout.write('<!DOCTYPE html>\n')
fout.write('<html lang="ja">\n')
fout.write('<head><meta charset="UTF-8" /><title>%s</title></head>\n' % htmlFile)
fout.write('<body>\n')
fout.write('<table border="1">\n')
fout.write(u'<thead><tr><th>ランク</th><th>EMD</th><th>タイトル</th>')
fout.write(u'<th>アーティスト</th><th>音声</th></tr></thead>\n')
fout.write(u'<tbody>\n')
# ランキングを出力
rank = 1
for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]:
prefix = sigFile.replace(".sig", "")
# rawをwavに変換(HTMLプレーヤー用)
rawPath = os.path.join("tmp/raw", prefix + ".raw")
wavPath = os.path.join("emd", prefix + ".wav")
if not os.path.exists("emd"): os.mkdir("emd")
os.system('sox -r 16000 -e signed-integer -b 16 "%s" "%s"' % (rawPath, wavPath))
# アーティスト名を取得
mp3Path = os.path.join("mp3", prefix + ".mp3")
artist = getArtist(mp3Path)
# HTML出力
# HTML5のオーディオプレーヤーを埋め込む
audio = '<audio src="%s" controls>' % wavPath
fout.write("<tr><td>%d</td><td>%.2f</td><td>%s</td><td>%s</td><td>%s</td></tr>\n"
% (rank, emd, prefix, artist, audio))
rank += 1
fout.write("</tbody>\n");
fout.write("</table>\n")
fout.write("</body>\n")
fout.write("</html>\n")
fout.close()
if __name__ == "__main__":
if len(sys.argv) != 2:
print "python mir.py [sig file]"
sys.exit()
targetSigPath = sys.argv[1]
sigDir = 'tmp/sig'
htmlFile = 'emd.html'
ranking = defaultdict(float)
# 全楽曲との間で距離を求める
for sigFile in os.listdir(sigDir):
sigPath = os.path.join(sigDir, sigFile)
emd = calcEMD(targetSigPath, sigPath)
if emd < 0: continue
ranking[sigFile] = emd
# ランキングをEMDの降順にソートして出力
N = 50
rank = 1
for sigFile, emd in sorted(ranking.items(), key=lambda x:x[1], reverse=False)[:N]:
print "%d\t%.2f\t%s" % (rank, emd, sigFile)
rank += 1
# EMDの昇順に上位10件をHTMLにして出力
makeHTML(ranking, htmlFile, N)
#coding:utf-8
import os
import math
import subprocess
# mp3_to_mfcc.py
# usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir]
# ディレクトリ内のMP3ファイルからMFCCを抽出する
# if filter == 'HIGH':
# os.system('sox -v 0.72 %s %s sinc -100 -t 0.01k' % (cat, pas))
# elif filter == 'MID':
# os.system('sox -v 0.72 %s %s sinc 100-1000 -t 0.01k' % (cat, pas))
# elif filter == 'LOW':
# os.system('sox -v 0.72 %s %s sinc 1000-10000 -t 0.01k' % (cat, pas))
# else:
# os.system('sox -v 0.72 %s %s' % (cat, pas))
#os.system("sox -v 0.72 temp.mp3 temp.wav sinc -100 -t 0.01k")
#os.system("sox temp.mp3 temp.wav")
period = 15
def getlength(wav):
p = subprocess.Popen('soxi -D "%s"' % wav, shell=True, cwd=os.getcwd(),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
return float(p.stdout.readline().rstrip())
def mp3ToRaw(mp3File, wavFile, rawFile):
os.system("lame --resample 16 -b 32 -a '%s' temp.mp3" % mp3File)
os.system("lame --decode temp.mp3 '%s'" % (wavFile))
wavToRaw(wavFile, rawFile)
def wavToRaw(wavFile, rawFile):
print "length: %s" % getlength(wavFile)
print "period: %s" % (period * 4)
print "looped: %s" % int(math.ceil(period * 4 / getlength(wavFile)))
os.system('cp "%s" temp.wav' % wavFile)
for var in range(1, int(math.ceil(period * 4 / getlength(wavFile)))):
print '[%i] temp.wav + "%s"' % (var, wavFile)
os.system('sox temp.wav "%s" "%s"' % (wavFile, wavFile))
os.system("sox '%s' '%s'" % (wavFile, rawFile))
#os.remove("temp.mp3")
os.remove('temp.wav')
# os.remove('temp2.wav')
def calcNumSample(rawFile):
# 1サンプルはshort型(2byte)なのでファイルサイズを2で割る
filesize = os.path.getsize(rawFile)
numsample = filesize / 2
return numsample
def extractCenter(inFile, outFile):
# 波形のサンプル数を求める
numsample = calcNumSample(inFile)
fs = 16000
center = numsample / 2
start = center - fs * period
end = center + fs * period
# period*2秒未満の場合は範囲を狭める
if start < 0: start = 0
if end > numsample - 1: end = numsample - 1
# SPTKのbcutコマンドで切り出す
os.system("bcut +s -s %d -e %d < '%s' > '%s'" % (start, end, inFile, outFile))
def calcMFCC(rawFile, mfccFile):
# サンプリング周波数: 16kHz
# フレーム長: 400サンプル
# シフト幅 : 160サンプル
# チャンネル数: 40
# MFCC: 19次元 + エネルギー
print 'calculate %s' % mfccFile
os.system("x2x +sf < '%s' | frame -l 400 -p 160 | mfcc -a 0.97 -l 400 -f 16 -n 40 -m 19 -E -c 10000 > '%s'"
% (rawFile, mfccFile))
if __name__ == "__main__":
# if len(sys.argv) != 4:
# print "usage: python mp3_to_mfcc.py [mp3dir] [mfccdir] [rawdir]"
# sys.exit()
#mp3Dir = 'mp3'
wavDir = 'wav'
mfcDir = 'tmp/mfc'
rawDir = 'tmp/raw'
if not os.path.exists('tmp'):
os.mkdir('tmp')
if not os.path.exists(mfcDir):
os.mkdir(mfcDir)
if not os.path.exists(rawDir):
os.mkdir(rawDir)
#for file in os.listdir(mp3Dir):
for file in os.listdir(wavDir):
if not file.endswith(".wav"): continue
# mp3File = os.path.join(mp3Dir, file)
wavFile = os.path.join(wavDir, file)
mfcFile = os.path.join(mfcDir, file.replace(".wav", ".mfc"))
rawFile = os.path.join(rawDir, file.replace(".wav", ".raw"))
# elif file.endswith(".wav"):
# mp3File = os.path.join(mp3Dir, file)
# mfccFile = os.path.join(mfccDir, file.replace(".wav", ".mfc"))
# extFile = os.path.join(extDir, file.replace(".wav", ".raw"))
# rawFile = os.path.join(rawDir, file.replace(".wav", ".raw"))
# wavFile = os.path.join(wavDir, file.replace(".wav", ".wav"))
try:
# MP3を変換
if not os.path.exists(rawFile):
wavToRaw(wavFile, 'temp.raw')
# 中央の30秒だけ抽出してrawFileへ
extractCenter('temp.raw', rawFile)
# MFCCを計算
calcMFCC(rawFile, mfcFile)
print "%s => %s" % (wavFile, mfcFile)
# 後片付け
os.remove("temp.raw")
except:
continue
os.system('python mksig.py')
#coding:utf-8
import os
import struct
import numpy as np
import scipy.cluster
# mfcc_to_signature.py
# usage: python mfcc_to_signature.py [mfccdir] [sigdir]
# 各曲のMFCCをシグネチャに変換する
def loadMFCC(mfccFile, m):
"""MFCCをロードする、mはMFCCの次元数"""
mfcc = []
fp = open(mfccFile, "rb")
while True:
b = fp.read(4)
if b == "": break
val = struct.unpack("f", b)[0]
mfcc.append(val)
fp.close()
# 各行がフレームのMFCC
# numFrame行、m列の行列形式に変換
mfcc = np.array(mfcc)
numFrame = len(mfcc) / m
mfcc = mfcc.reshape(numFrame, m)
return mfcc
def vq(mfcc, k):
"""mfccのベクトル集合をk個のクラスタにベクトル量子化"""
codebook, destortion = scipy.cluster.vq.kmeans(mfcc, k)
code, dist = scipy.cluster.vq.vq(mfcc, codebook)
return code
if __name__ == "__main__":
# if len(sys.argv) != 3:
# print "usage: python mfcc_to_signature.py [mfccdir] [sigdir]"
# sys.exit()
mfccDir = 'tmp/mfc'
sigDir = 'tmp/sig'
if not os.path.exists(sigDir):
os.mkdir(sigDir)
for file in os.listdir(mfccDir):
if not file.endswith(".mfc"): continue
mfccFile = os.path.join(mfccDir, file)
sigFile = os.path.join(sigDir, file.replace(".mfc", ".sig"))
if not os.path.exists(sigFile):
print mfccFile, "=>", sigFile
fout = open(sigFile, "w")
# MFCCをロード
# 各行がフレームのMFCCベクトル
mfcc = loadMFCC(mfccFile, 20)
# MFCCをベクトル量子化してコードを求める
code = vq(mfcc, 16)
# 各クラスタのデータ数、平均ベクトル、
# 共分散行列を求めてシグネチャとする
for k in range(16):
# クラスタkのフレームのみ抽出
frames = np.array([mfcc[i] for i in range(len(mfcc)) if code[i] == k])
# MFCCの各次元の平均をとって平均ベクトルを求める
m = np.apply_along_axis(np.mean, 0, frames) # 0は縦方向
# MFCCの各次元間での分散・共分散行列を求める
sigma = np.cov(frames.T)
# 重み(各クラスタのデータ数)
w = len(frames)
# このクラスタの特徴量をフラット形式で出力
# 1行が重み1個、平均ベクトル20個、分散・共分散行列400個の計421個の数値列
features = np.hstack((w, m, sigma.flatten()))
features = [str(x) for x in features]
fout.write(" ".join(features) + "\n")
fout.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment