Created
July 8, 2014 14:52
-
-
Save robskillington/87577683db9314acf5b5 to your computer and use it in GitHub Desktop.
Document language classifier
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
'''docclass.py: Language classifier''' | |
__author__ = "Rob Skillington" | |
from math import sqrt, log | |
import re, os, sys, getopt | |
DEFAULTS = { | |
'CLASSES' : 'class_names', | |
'TRAIN_PATH' : 'train/', | |
'TRAIN_CLASS' : 'train-class', | |
'TRAIN_LANG' : 'train-lang', | |
'CLASSIFY_PATH' : 'test/', | |
'RESULT_CLASSES' : 'docclass-class-results' | |
} | |
EXIT_CODES = { | |
'USAGE' : 1 | |
} | |
def tokenise_by_whitespace(string): | |
'''Converts string into a list of word tokens, strip symbols''' | |
return re.findall('\w+', string.decode('utf8'), re.UNICODE) | |
class TrainingSet(): | |
def __init__(self, directory, classfile, langfile, tokeniser, classes, miselect=False): | |
self.fileclasses = {} | |
self.filelangs = {} | |
self.tokendocfreq = {} | |
self.classes = classes | |
self._readclassifers(self.fileclasses, classfile) | |
self._readclassifers(self.filelangs, langfile) | |
self._parse(directory, tokeniser, miselect) | |
def getclassid(self, docid): | |
'''Gets the classid for a training document''' | |
return self.fileclasses[docid] | |
def getlang(self, docid): | |
'''Gets the lang for a training document''' | |
return self.filelangs[docid] | |
def _readclassifers(self, dic, fname): | |
'''Reads classifications of training set into dict''' | |
fh = open(fname, 'r') | |
for line in fh: | |
(docname, classifier) = line.split() | |
dic[docname] = classifier | |
fh.close() | |
def _parse(self, directory, tokeniser, miselect): | |
'''Parses the directory and builds the token to class frequency dict''' | |
if (miselect): | |
tokenclassfreq = {} | |
tokentotalfreq = {} | |
classtotalfreq = {} | |
totalfreq = 0 | |
for setid in self.classes.collection: | |
classtotalfreq[setid] = 0 | |
for fname in os.listdir(directory): | |
fpath = directory.rstrip(os.sep) + os.sep + fname | |
if os.path.isfile(fpath): | |
fh = open(fpath, 'r') | |
if self.fileclasses.has_key(fname) and self.filelangs.has_key(fname): | |
classid = self.fileclasses[fname] | |
lang = self.filelangs[fname] | |
for line in fh: | |
tokenised = tokeniser(line) | |
for token in tokenised: | |
# For distance selection | |
if self.tokendocfreq.has_key(token): | |
if self.tokendocfreq[token].has_key(fname): | |
self.tokendocfreq[token][fname] = \ | |
self.tokendocfreq[token][fname] + 1 | |
else: | |
self.tokendocfreq[token][fname] = 1 | |
else: | |
self.tokendocfreq[token] = {fname:1} | |
# For MI selection | |
if (miselect): | |
if tokenclassfreq.has_key(token): | |
if tokenclassfreq[token].has_key(classid): | |
tokenclassfreq[token][classid] = \ | |
tokenclassfreq[token][classid] + 1 | |
else: | |
tokenclassfreq[token][classid] = 1 | |
else: | |
tokenclassfreq[token] = {} | |
for setid in self.classes.collection: | |
tokenclassfreq[token][setid] = 0 | |
tokenclassfreq[token][classid] = 1 | |
if tokentotalfreq.has_key(token): | |
tokentotalfreq[token] = \ | |
tokentotalfreq[token] + 1 | |
else: | |
tokentotalfreq[token] = 1 | |
if classtotalfreq.has_key(classid): | |
classtotalfreq[classid] = \ | |
classtotalfreq[classid] + 1 | |
else: | |
classtotalfreq[classid] = 1 | |
totalfreq = totalfreq + 1 | |
fh.close() | |
if (miselect): | |
self._mutualinformationcrop(miselect, tokenclassfreq, tokentotalfreq, classtotalfreq, totalfreq) | |
def _mutualinformationcrop(self, includepercent, tokenclassfreq, tokentotalfreq, classtotalfreq, totalfreq): | |
'''Crops the tokens used by analysing each token's mutual information | |
and selecting N best tokens where N=includepercent*totaltokens''' | |
if (includepercent < 0.01 or includepercent > 0.99): | |
raise ValueError('MI selection must be float between inclusive range 0.01 to 0.99') | |
totaltoks = len(self.tokendocfreq) | |
amount = int(round(includepercent*totaltoks)) | |
mitable = [] | |
print '## Performing MI to select', amount, 'of', totaltoks | |
for token, classesdic in tokenclassfreq.items(): | |
mi = 0.0 | |
for classid, freq in classesdic.items(): | |
for t in range(2): | |
ptc = freq / float(totalfreq) | |
pt = tokentotalfreq[token] / float(totalfreq) | |
pc = classtotalfreq[classid] / float(totalfreq) | |
if not t: | |
ptc = (classtotalfreq[classid] - freq) / float(totalfreq) | |
pt = 1 - pt | |
if ptc: | |
mi += ptc*log(ptc/(pt*pc), 2) | |
mitable.append((token, mi)) | |
mitable = sorted(mitable, key=lambda x: x[1], reverse=True) | |
culltokens = mitable[amount:] | |
for token, mi in culltokens: | |
del self.tokendocfreq[token] | |
class Classes(): | |
def __init__(self, classfile): | |
self.collection = [] | |
fh = open(classfile, 'r') | |
for line in fh: | |
line = line.strip() | |
if line: | |
self.collection.append(line.split()[0]) | |
fh.close() | |
class AbstractClassifier(): | |
def __init__(self, directory, classes, training, tokeniser): | |
self.directory = directory | |
self.classes = classes | |
self.training = training | |
self.tokeniser = tokeniser | |
def dotproduct(self, a, b): | |
return sum([a[i]*b[i] for i in range(len(a))]) | |
def magnitude(self, a): | |
return sqrt(sum([x*x for x in a])) | |
def classify(self): | |
pass | |
class CosineSimilarityModelClassifier(AbstractClassifier): | |
def classify(self): | |
classification = [] | |
traindoctokenvectors = {} | |
tokens = [] | |
basevec = [0] * len(self.training.tokendocfreq) | |
print '## Creating training vectors' | |
# Create training document vectors | |
i = 0 | |
for token, docsdic in self.training.tokendocfreq.items(): | |
tokens.append(token) | |
for docid, freq in docsdic.items(): | |
if not traindoctokenvectors.has_key(docid): | |
traindoctokenvectors[docid] = basevec[:] | |
traindoctokenvectors[docid][i] = freq | |
i = i + 1 | |
# Cache | |
traindoctokenvectorsiterable = traindoctokenvectors.items() | |
traindoctokenvectorlengths = {} | |
print '## Classifying documents' | |
# Classify documents | |
for fname in os.listdir(self.directory): | |
fpath = self.directory.rstrip(os.sep) + os.sep + fname | |
if os.path.isfile(fpath): | |
# Gather token freq for document | |
fh = open(fpath, 'r') | |
tokenfreq = {} | |
for line in fh: | |
tokenised = self.tokeniser(line) | |
for token in tokenised: | |
if tokenfreq.has_key(token): | |
tokenfreq[token] = tokenfreq[token] + 1 | |
else: | |
tokenfreq[token] = 1 | |
fh.close() | |
# Create vector | |
docvec = basevec[:] | |
i = 0 | |
for token in tokens: | |
if tokenfreq.has_key(token): | |
docvec[i] = tokenfreq[token] | |
i = i + 1 | |
# Compute vector length | |
docveclength = self.magnitude(docvec) | |
# Compare against all training vectors | |
nearest = (self.classes.collection[0], 0.0) | |
for docid, vec in traindoctokenvectorsiterable: | |
dividend = self.dotproduct(vec, docvec) | |
if dividend: | |
if traindoctokenvectorlengths.has_key(docid): | |
veclength = traindoctokenvectorlengths[docid] | |
else: | |
veclength = traindoctokenvectorlengths[docid] = \ | |
self.magnitude(vec) | |
divisor = veclength * docveclength | |
cosinevalue = dividend / divisor | |
if cosinevalue > nearest[1]: | |
nearest = (self.training.getclassid(docid), cosinevalue) | |
docclass = (fname, nearest[0]) | |
print '### Classified "%s" class: %s' % docclass | |
classification.append(docclass) | |
return classification | |
class SkewDivergenceClassifier(AbstractClassifier): | |
def classify(self): | |
classification = [] | |
traindoctokenvectors = {} | |
mlevectors = {} | |
tokens = [] | |
baseveclength = len(self.training.tokendocfreq) | |
basevec = [0] * baseveclength | |
print '## Creating training MLE probability vectors' | |
# Create training document MLE probability estimates | |
# First create vectors then divide each value by sum of vector | |
i = 0 | |
for token, docsdic in self.training.tokendocfreq.items(): | |
tokens.append(token) | |
for docid, freq in docsdic.items(): | |
if not traindoctokenvectors.has_key(docid): | |
traindoctokenvectors[docid] = basevec[:] | |
traindoctokenvectors[docid][i] = freq | |
i = i + 1 | |
# Create MLE vectors | |
for docid, vec in traindoctokenvectors.items(): | |
freqsum = sum(vec) | |
mlevectors[docid] = [x / float(freqsum) for x in vec] | |
del traindoctokenvectors | |
# Cache | |
mlevectorsiterable = mlevectors.items() | |
print '## Classifying documents' | |
alpha = 0.99 | |
# Classify documents | |
for fname in os.listdir(self.directory): | |
fpath = self.directory.rstrip(os.sep) + os.sep + fname | |
if os.path.isfile(fpath): | |
# Gather token freq for document | |
fh = open(fpath, 'r') | |
tokenfreqtotal = 0 | |
tokenfreq = {} | |
for line in fh: | |
tokenised = self.tokeniser(line) | |
for token in tokenised: | |
tokenfreqtotal = tokenfreqtotal + 1 | |
if tokenfreq.has_key(token): | |
tokenfreq[token] = tokenfreq[token] + 1 | |
else: | |
tokenfreq[token] = 1 | |
fh.close() | |
# Create MLE vector | |
tokenfreqtotal = float(tokenfreqtotal) | |
x = basevec[:] | |
i = 0 | |
for token in tokens: | |
if tokenfreq.has_key(token): | |
x[i] = tokenfreq[token] / tokenfreqtotal | |
i = i + 1 | |
# Compare against all training vectors | |
nearest = (self.classes.collection[0], None) | |
for traindocid, y in mlevectorsiterable: | |
skewdiv = 0.0 | |
for i in range(baseveclength): | |
if x[i] > 0.0: | |
skewdiv += x[i]*(log(x[i],2) - log((alpha*y[i]) + ((1-alpha)*x[i]), 2)) | |
if not nearest[1]: | |
nearest = (self.training.getclassid(traindocid), skewdiv) | |
elif skewdiv < nearest[1]: | |
nearest = (self.training.getclassid(traindocid), skewdiv) | |
docclass = (fname, nearest[0]) | |
print '### Classified "%s" class: %s' % docclass | |
classification.append(docclass) | |
return classification | |
def usage(): | |
print 'Usage: docclass.py [-c|-s] [-f] [-h]' | |
print '' | |
print 'Arguments:' | |
print '' | |
print '-c --cos Apply cosine similarity model classification' | |
print '-s --skew Apply skew divergence classification' | |
print '-f --feature-selection Apply feature selection over tokens in training documents using Mutual Information' | |
print ' Value must be between 0.01 and 0.99 inclusive - this represents the percentage of tokens to keep' | |
print '-o --out Specify file to write results to' | |
print '-h --help Prints this usage message' | |
sys.exit(EXIT_CODES['USAGE']) | |
def main(): | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], 'csf:o:h', ['cos', 'skew', 'feature-selection', 'out', 'help']) | |
except getopt.GetoptError, err: | |
print(str(err)) | |
usage() | |
methods = ['cos', 'skew'] | |
method = methods[0] | |
includepercent = False | |
resultsfile = DEFAULTS['RESULT_CLASSES'] | |
for o, a in opts: | |
if o in ('-c', '--cos'): | |
continue | |
elif o in ('-s', '--skew'): | |
method = 'skew' | |
elif o in ('-f', '--feature-selection'): | |
try: | |
includepercent = float(a) | |
if (includepercent < 0.01 or includepercent > 0.99): | |
raise ValueError() | |
except ValueError, e: | |
usage() | |
elif o in ('-o', '--out'): | |
resultsfile = a | |
elif o in ('-h', '--help'): | |
usage() | |
else: | |
assert False, 'invalid option' | |
# Read classes | |
classes = Classes(classfile=DEFAULTS['CLASSES']) | |
print '# Parsing training documents' | |
# Parse training set | |
training = TrainingSet( | |
directory=DEFAULTS['TRAIN_PATH'], | |
classfile=DEFAULTS['TRAIN_CLASS'], | |
langfile=DEFAULTS['TRAIN_LANG'], | |
tokeniser=tokenise_by_whitespace, | |
classes=classes, | |
miselect=includepercent | |
) | |
# Classify docs using cosine similiarity rule | |
if method == 'cos': | |
classifier = CosineSimilarityModelClassifier( | |
directory=DEFAULTS['CLASSIFY_PATH'], | |
classes=classes, | |
training=training, | |
tokeniser=tokenise_by_whitespace | |
) | |
print '# Cosine Similarity Model classification begun' | |
elif method == 'skew': | |
classifier = SkewDivergenceClassifier( | |
directory=DEFAULTS['CLASSIFY_PATH'], | |
classes=classes, | |
training=training, | |
tokeniser=tokenise_by_whitespace | |
) | |
print '# Skew Divergence classification begun' | |
classification = classifier.classify() | |
print 'Results:' | |
print '--' | |
fh = open(resultsfile, 'w') | |
for entry in classification: | |
fh.write('%s\t%s\n' % entry) | |
print '%s\t%s' % entry | |
fh.close() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment