Skip to content

Instantly share code, notes, and snippets.

@robskillington
Created July 8, 2014 14:52
Show Gist options
  • Save robskillington/87577683db9314acf5b5 to your computer and use it in GitHub Desktop.
Save robskillington/87577683db9314acf5b5 to your computer and use it in GitHub Desktop.
Document language classifier
#!/usr/bin/env python
'''docclass.py: Language classifier'''
__author__ = "Rob Skillington"
from math import sqrt, log
import re, os, sys, getopt
DEFAULTS = {
'CLASSES' : 'class_names',
'TRAIN_PATH' : 'train/',
'TRAIN_CLASS' : 'train-class',
'TRAIN_LANG' : 'train-lang',
'CLASSIFY_PATH' : 'test/',
'RESULT_CLASSES' : 'docclass-class-results'
}
EXIT_CODES = {
'USAGE' : 1
}
def tokenise_by_whitespace(string):
'''Converts string into a list of word tokens, strip symbols'''
return re.findall('\w+', string.decode('utf8'), re.UNICODE)
class TrainingSet():
def __init__(self, directory, classfile, langfile, tokeniser, classes, miselect=False):
self.fileclasses = {}
self.filelangs = {}
self.tokendocfreq = {}
self.classes = classes
self._readclassifers(self.fileclasses, classfile)
self._readclassifers(self.filelangs, langfile)
self._parse(directory, tokeniser, miselect)
def getclassid(self, docid):
'''Gets the classid for a training document'''
return self.fileclasses[docid]
def getlang(self, docid):
'''Gets the lang for a training document'''
return self.filelangs[docid]
def _readclassifers(self, dic, fname):
'''Reads classifications of training set into dict'''
fh = open(fname, 'r')
for line in fh:
(docname, classifier) = line.split()
dic[docname] = classifier
fh.close()
def _parse(self, directory, tokeniser, miselect):
'''Parses the directory and builds the token to class frequency dict'''
if (miselect):
tokenclassfreq = {}
tokentotalfreq = {}
classtotalfreq = {}
totalfreq = 0
for setid in self.classes.collection:
classtotalfreq[setid] = 0
for fname in os.listdir(directory):
fpath = directory.rstrip(os.sep) + os.sep + fname
if os.path.isfile(fpath):
fh = open(fpath, 'r')
if self.fileclasses.has_key(fname) and self.filelangs.has_key(fname):
classid = self.fileclasses[fname]
lang = self.filelangs[fname]
for line in fh:
tokenised = tokeniser(line)
for token in tokenised:
# For distance selection
if self.tokendocfreq.has_key(token):
if self.tokendocfreq[token].has_key(fname):
self.tokendocfreq[token][fname] = \
self.tokendocfreq[token][fname] + 1
else:
self.tokendocfreq[token][fname] = 1
else:
self.tokendocfreq[token] = {fname:1}
# For MI selection
if (miselect):
if tokenclassfreq.has_key(token):
if tokenclassfreq[token].has_key(classid):
tokenclassfreq[token][classid] = \
tokenclassfreq[token][classid] + 1
else:
tokenclassfreq[token][classid] = 1
else:
tokenclassfreq[token] = {}
for setid in self.classes.collection:
tokenclassfreq[token][setid] = 0
tokenclassfreq[token][classid] = 1
if tokentotalfreq.has_key(token):
tokentotalfreq[token] = \
tokentotalfreq[token] + 1
else:
tokentotalfreq[token] = 1
if classtotalfreq.has_key(classid):
classtotalfreq[classid] = \
classtotalfreq[classid] + 1
else:
classtotalfreq[classid] = 1
totalfreq = totalfreq + 1
fh.close()
if (miselect):
self._mutualinformationcrop(miselect, tokenclassfreq, tokentotalfreq, classtotalfreq, totalfreq)
def _mutualinformationcrop(self, includepercent, tokenclassfreq, tokentotalfreq, classtotalfreq, totalfreq):
'''Crops the tokens used by analysing each token's mutual information
and selecting N best tokens where N=includepercent*totaltokens'''
if (includepercent < 0.01 or includepercent > 0.99):
raise ValueError('MI selection must be float between inclusive range 0.01 to 0.99')
totaltoks = len(self.tokendocfreq)
amount = int(round(includepercent*totaltoks))
mitable = []
print '## Performing MI to select', amount, 'of', totaltoks
for token, classesdic in tokenclassfreq.items():
mi = 0.0
for classid, freq in classesdic.items():
for t in range(2):
ptc = freq / float(totalfreq)
pt = tokentotalfreq[token] / float(totalfreq)
pc = classtotalfreq[classid] / float(totalfreq)
if not t:
ptc = (classtotalfreq[classid] - freq) / float(totalfreq)
pt = 1 - pt
if ptc:
mi += ptc*log(ptc/(pt*pc), 2)
mitable.append((token, mi))
mitable = sorted(mitable, key=lambda x: x[1], reverse=True)
culltokens = mitable[amount:]
for token, mi in culltokens:
del self.tokendocfreq[token]
class Classes():
def __init__(self, classfile):
self.collection = []
fh = open(classfile, 'r')
for line in fh:
line = line.strip()
if line:
self.collection.append(line.split()[0])
fh.close()
class AbstractClassifier():
def __init__(self, directory, classes, training, tokeniser):
self.directory = directory
self.classes = classes
self.training = training
self.tokeniser = tokeniser
def dotproduct(self, a, b):
return sum([a[i]*b[i] for i in range(len(a))])
def magnitude(self, a):
return sqrt(sum([x*x for x in a]))
def classify(self):
pass
class CosineSimilarityModelClassifier(AbstractClassifier):
def classify(self):
classification = []
traindoctokenvectors = {}
tokens = []
basevec = [0] * len(self.training.tokendocfreq)
print '## Creating training vectors'
# Create training document vectors
i = 0
for token, docsdic in self.training.tokendocfreq.items():
tokens.append(token)
for docid, freq in docsdic.items():
if not traindoctokenvectors.has_key(docid):
traindoctokenvectors[docid] = basevec[:]
traindoctokenvectors[docid][i] = freq
i = i + 1
# Cache
traindoctokenvectorsiterable = traindoctokenvectors.items()
traindoctokenvectorlengths = {}
print '## Classifying documents'
# Classify documents
for fname in os.listdir(self.directory):
fpath = self.directory.rstrip(os.sep) + os.sep + fname
if os.path.isfile(fpath):
# Gather token freq for document
fh = open(fpath, 'r')
tokenfreq = {}
for line in fh:
tokenised = self.tokeniser(line)
for token in tokenised:
if tokenfreq.has_key(token):
tokenfreq[token] = tokenfreq[token] + 1
else:
tokenfreq[token] = 1
fh.close()
# Create vector
docvec = basevec[:]
i = 0
for token in tokens:
if tokenfreq.has_key(token):
docvec[i] = tokenfreq[token]
i = i + 1
# Compute vector length
docveclength = self.magnitude(docvec)
# Compare against all training vectors
nearest = (self.classes.collection[0], 0.0)
for docid, vec in traindoctokenvectorsiterable:
dividend = self.dotproduct(vec, docvec)
if dividend:
if traindoctokenvectorlengths.has_key(docid):
veclength = traindoctokenvectorlengths[docid]
else:
veclength = traindoctokenvectorlengths[docid] = \
self.magnitude(vec)
divisor = veclength * docveclength
cosinevalue = dividend / divisor
if cosinevalue > nearest[1]:
nearest = (self.training.getclassid(docid), cosinevalue)
docclass = (fname, nearest[0])
print '### Classified "%s" class: %s' % docclass
classification.append(docclass)
return classification
class SkewDivergenceClassifier(AbstractClassifier):
def classify(self):
classification = []
traindoctokenvectors = {}
mlevectors = {}
tokens = []
baseveclength = len(self.training.tokendocfreq)
basevec = [0] * baseveclength
print '## Creating training MLE probability vectors'
# Create training document MLE probability estimates
# First create vectors then divide each value by sum of vector
i = 0
for token, docsdic in self.training.tokendocfreq.items():
tokens.append(token)
for docid, freq in docsdic.items():
if not traindoctokenvectors.has_key(docid):
traindoctokenvectors[docid] = basevec[:]
traindoctokenvectors[docid][i] = freq
i = i + 1
# Create MLE vectors
for docid, vec in traindoctokenvectors.items():
freqsum = sum(vec)
mlevectors[docid] = [x / float(freqsum) for x in vec]
del traindoctokenvectors
# Cache
mlevectorsiterable = mlevectors.items()
print '## Classifying documents'
alpha = 0.99
# Classify documents
for fname in os.listdir(self.directory):
fpath = self.directory.rstrip(os.sep) + os.sep + fname
if os.path.isfile(fpath):
# Gather token freq for document
fh = open(fpath, 'r')
tokenfreqtotal = 0
tokenfreq = {}
for line in fh:
tokenised = self.tokeniser(line)
for token in tokenised:
tokenfreqtotal = tokenfreqtotal + 1
if tokenfreq.has_key(token):
tokenfreq[token] = tokenfreq[token] + 1
else:
tokenfreq[token] = 1
fh.close()
# Create MLE vector
tokenfreqtotal = float(tokenfreqtotal)
x = basevec[:]
i = 0
for token in tokens:
if tokenfreq.has_key(token):
x[i] = tokenfreq[token] / tokenfreqtotal
i = i + 1
# Compare against all training vectors
nearest = (self.classes.collection[0], None)
for traindocid, y in mlevectorsiterable:
skewdiv = 0.0
for i in range(baseveclength):
if x[i] > 0.0:
skewdiv += x[i]*(log(x[i],2) - log((alpha*y[i]) + ((1-alpha)*x[i]), 2))
if not nearest[1]:
nearest = (self.training.getclassid(traindocid), skewdiv)
elif skewdiv < nearest[1]:
nearest = (self.training.getclassid(traindocid), skewdiv)
docclass = (fname, nearest[0])
print '### Classified "%s" class: %s' % docclass
classification.append(docclass)
return classification
def usage():
print 'Usage: docclass.py [-c|-s] [-f] [-h]'
print ''
print 'Arguments:'
print ''
print '-c --cos Apply cosine similarity model classification'
print '-s --skew Apply skew divergence classification'
print '-f --feature-selection Apply feature selection over tokens in training documents using Mutual Information'
print ' Value must be between 0.01 and 0.99 inclusive - this represents the percentage of tokens to keep'
print '-o --out Specify file to write results to'
print '-h --help Prints this usage message'
sys.exit(EXIT_CODES['USAGE'])
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], 'csf:o:h', ['cos', 'skew', 'feature-selection', 'out', 'help'])
except getopt.GetoptError, err:
print(str(err))
usage()
methods = ['cos', 'skew']
method = methods[0]
includepercent = False
resultsfile = DEFAULTS['RESULT_CLASSES']
for o, a in opts:
if o in ('-c', '--cos'):
continue
elif o in ('-s', '--skew'):
method = 'skew'
elif o in ('-f', '--feature-selection'):
try:
includepercent = float(a)
if (includepercent < 0.01 or includepercent > 0.99):
raise ValueError()
except ValueError, e:
usage()
elif o in ('-o', '--out'):
resultsfile = a
elif o in ('-h', '--help'):
usage()
else:
assert False, 'invalid option'
# Read classes
classes = Classes(classfile=DEFAULTS['CLASSES'])
print '# Parsing training documents'
# Parse training set
training = TrainingSet(
directory=DEFAULTS['TRAIN_PATH'],
classfile=DEFAULTS['TRAIN_CLASS'],
langfile=DEFAULTS['TRAIN_LANG'],
tokeniser=tokenise_by_whitespace,
classes=classes,
miselect=includepercent
)
# Classify docs using cosine similiarity rule
if method == 'cos':
classifier = CosineSimilarityModelClassifier(
directory=DEFAULTS['CLASSIFY_PATH'],
classes=classes,
training=training,
tokeniser=tokenise_by_whitespace
)
print '# Cosine Similarity Model classification begun'
elif method == 'skew':
classifier = SkewDivergenceClassifier(
directory=DEFAULTS['CLASSIFY_PATH'],
classes=classes,
training=training,
tokeniser=tokenise_by_whitespace
)
print '# Skew Divergence classification begun'
classification = classifier.classify()
print 'Results:'
print '--'
fh = open(resultsfile, 'w')
for entry in classification:
fh.write('%s\t%s\n' % entry)
print '%s\t%s' % entry
fh.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment