Skip to content

Instantly share code, notes, and snippets.

@RicherMans
Created April 16, 2014 08:01
Show Gist options
  • Select an option

  • Save RicherMans/10828607 to your computer and use it in GitHub Desktop.

Select an option

Save RicherMans/10828607 to your computer and use it in GitHub Desktop.
Alize 3.0 scripts
import os
import argparse as argp
import re
from difflib import SequenceMatcher, get_close_matches
from __builtin__ import exit
import threading
class Container:
def __init__(self,filename):
self.filename=filename
self._filecontent=list()
def addContent(self,content):
self._filecontent.append(content)
def getContent(self):
res=str()
#Needs to sort before returning, because HCopy does not sort the data
#IT could happen that the data is concatinated wrong
self._filecontent.sort()
self._filecontent.sort(key=len )
flen=len(self._filecontent)
for i in range(flen):
if (i+1<flen):
res+=self._filecontent[i]+' + '
else:
res+=self._filecontent[i]
res=res.replace('\n', '')
return res
#===============================================================================
# This script assembles out of a given .scp file, which already is formatted
# into source \t featurefile , every parttial sequence of the file into one
# featurefile
#===============================================================================
parser = argp.ArgumentParser()
parser.add_argument('-i',type=file,help='Input .scp file')
parser.add_argument('-o',help='Output directory ')
parser.add_argument('-glst',type=str,help='also generates a .lst file containing the speakernames, given the path',default='lst/data.lst')
parser.add_argument('-f',action='store_true',help='writes a .scp file which can afterwards be executed with HCopy',default=False)
parser.add_argument('-regex',type=str,help='the regex to split the given data into name and speech segment',default='\w+-[AB]')
args=parser.parse_args()
if(args.i is None or args.o is None):
parser.print_help()
exit()
fileending='tmp.prm'
regex=re.compile(args.regex)
sortednames = ([entr for entr in args.i])
sortednames.sort()
i=0
j=0
lists=list()
while i < len(sortednames):
names=list()
entry = os.path.basename(sortednames[i])
match=regex.search(entry)
spkname=match.group(0)
cont=Container(spkname)
#add the current item
cont.addContent(sortednames[i])
j = i+1
while j < len(sortednames):
check = os.path.basename(sortednames[j])
if(check.startswith(spkname) and entry is not check):
cont.addContent(sortednames[j])
else:
break
j=j+1
lists.append(cont)
i=j
print 'Finished gathering Data'
def runHCopy(i):
cont = lists[i]
cmd = 'bin/HCopy ' + cont.getContent() +' '+ args.o + cont.filename +'.' + fileending
os.system(cmd)
def writeSCP(i):
cont = lists[i]
with open('concat_Frames.tmp.scp','a') as openl:
openl.write(cont.getContent() +' '+ args.o + cont.filename +'.' + fileending + '\n')
runningThreads = list()
for i in range(len(lists)):
tar=runHCopy
if(args.f):
tar=writeSCP
t = threading.Thread( target=tar,args=(i,))
runningThreads.append(t)
t.start()
for thr in runningThreads:
thr.join()
print 'Finished assmbling all files'
if args.glst:
print 'Beginning to generate data.lst file'
fn = args.glst
if not os.path.exists(fn):
with open(fn,'w') as glp:
glp.writelines([cont.filename + '\n' for cont in lists])
print 'Finished data.lst'
import argparse
import os
import re
import csv
import subprocess
#===============================================================================
# This script trains the World, the Totalvariability matrix and extracts the Ivector
# Make sure to have a folder called "cfg" in your directory, which consists of the config files
# For training the World, TotalVariability and IVector
# The script does generate .ndx files for TV and IV Extraction, but these can also be provided by the user
#===============================================================================
parser = argparse.ArgumentParser()
parser.add_argument('-tw', action='store_true', help='Runs only TrainWorld', required=False)
parser.add_argument('-tv', action='store_true', help='runs only Training of Total Variability ', required=False)
parser.add_argument('-iv', action='store_true', help='Runs only Ivector Extraction', required=False)
args = parser.parse_args()
pathkeywords = ['featureFilesPath', 'mixtureFilesPath', 'matrixFilesPath', 'saveVectorFilesPath']
tvkeywords = ['ndxFilename']
ivkeywords = ['targetIdList']
# threadkeyword=['numThread']
pathregex = re.compile('|'.join(pathkeywords))
tvregex = re.compile('|'.join(tvkeywords))
ivregex = re.compile('|'.join(ivkeywords))
# threadregex=re.compile('|'.join(threadkeyword))
def trainWorld():
#--------------- Read in the config file given and create all necessary dirs
with open('cfg/TrainWorld.cfg', 'r') as twp:
for line in twp.readlines():
if pathregex.search(line):
key, value = line.partition("\t")[::2]
value = value.strip()
if not os.path.exists(value):
os.makedirs(value)
#------------------------------------------------- run the TrainWorld Script
with open('TrainWorld.log','w') as twp:
with open('TrainWorld.err','w') as twep:
p1=subprocess.Popen('bin/TrainWorld --config cfg/TrainWorld.cfg',shell=True,stdout=twp,stderr=twep)
p1.wait()
def trainTV():
#---------------------------------- Prepare data ( ndx file ) for processing
with open('cfg/TotalVariability_fast.cfg', 'r') as twp:
paths = dict()
lines = filter(lambda x: pathregex.search(x) or tvregex.search(x), twp.readlines())
pathlines = filter(lambda x:pathregex.search(x), lines)
tvlines = filter(lambda x:tvregex.search(x), lines)
# Fill into paths all the paths, which are given by the pathkeyword argument
for line in pathlines:
key, value = line.partition("\t")[::2]
value = value.strip()
paths[key.strip()] = value
if not os.path.exists(value):
os.makedirs(value)
for line in tvlines:
key, value = line.partition("\t")[::2]
value = value.strip()
readir=paths['featureFilesPath']
createIndexDirs(value, readir)
rawfiles = list()
for rawfile in os.listdir(readir):
rawfiles.append(removeEnding(rawfile))
with open(value, 'w+') as ndxp:
ndxp.writelines(line + '\n' for line in rawfiles)
with open('TotalVariability.log','w') as twp:
with open('TotalVariability.err','w') as twep:
p1=subprocess.Popen('bin/TotalVariability --config cfg/TotalVariability_fast.cfg',shell=True,stdout=twp,stderr=twep)
p1.wait()
def extractIV():
with open('cfg/ivExtractor_fast.cfg', 'r') as twp:
paths = dict()
#=======================================================================
# Dont iterate two times over the whole file, just once
#=======================================================================
lines = filter(lambda x:pathregex.search(x) or ivregex.search(x), twp.readlines())
pathlines = filter(lambda x:pathregex.search(x), lines)
ivlines = filter(lambda x:ivregex.search(x), lines)
for line in pathlines:
key, value = line.partition("\t")[::2]
value = value.strip()
paths[key.strip()] = value
if not os.path.exists(value):
os.makedirs(value)
# lines are not neccessarily ordered
for line in ivlines:
key, value = line.partition("\t")[::2]
value = value.strip()
readir= paths['featureFilesPath']
createIndexDirs(value,readir)
rawfiles = list()
for rawfile in os.listdir(readir):
rawfiles.append(removeEnding(rawfile))
with open(value, 'w+') as ndxp:
ndxp.writelines(line + '\t '+ line + '\n' for line in rawfiles)
with open('IvExtract.log','w') as twp:
with open('IvExtract.err','w') as ivep:
p1=subprocess.Popen('bin/IvExtractor --config cfg/ivExtractor_fast.cfg ',shell=True,stdout=twp,stderr=ivep)
p1.wait()
def createIndexDirs(ndxfilename,readir):
# IF file is existing, but empty ... create the file
if not os.path.isfile(ndxfilename):
if not os.path.exists(os.path.dirname(ndxfilename)):
os.makedirs(os.path.dirname(ndxfilename))
# if file is not empty delete it and replace
if os.path.exists(ndxfilename) and os.stat(ndxfilename)[6] == 0 :
os.remove(ndxfilename)
# removes the extension of the given filename
def removeEnding(text):
t, ending = os.path.splitext(text)
while ending is not "":
t, ending = os.path.splitext(t)
return t
def main():
# check if any argument is given
if args.tw:
print 'Training World'
trainWorld()
if args.tv:
print 'Training TV Matrix'
trainTV()
if args.iv:
print 'Extracting I-Vector'
extractIV()
if not any(vars(args).values()):
trainWorld()
trainTV()
extractIV()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment