Skip to content

Instantly share code, notes, and snippets.

@jaganadhg
Created October 23, 2013 12:13
Show Gist options
  • Select an option

  • Save jaganadhg/7117483 to your computer and use it in GitHub Desktop.

Select an option

Save jaganadhg/7117483 to your computer and use it in GitHub Desktop.
A fork and cleaned version of entropy-calculator http://code.google.com/p/entropy-calculator/
#!/usr/bin/env python
"""
Module to find keywords using the Montemurro and Zanette algorithm.
Created by Dr Peter J. Bleackley
"""
from math import log, exp, lgamma
import re
class EntropyCalculator(object):
"""
Class that contains data and methods for calculating the entropy of
words in a text
"""
strip_xml = re.compile(u'<[^>]*>')
split_words = re.compile(u"[^A-Za-z0-9']+")
split_sentences = re.compile(u'[.!?]')
split_xml = re.compile(u'</p>')
strip_times = re.compile(u'\d\d:\d\d:\d\d\.\d\d\d')
def __init__(self, p):
"""
Creates an EntropyCalculator that uses P blocks to analyse the text
"""
self.parts = p
self.nwords = 0
self.text = []
self.wordsperpart = 0
self.unique = None
self.rawtext = u''
self.cleantext = u''
self.totalentropy = 0.0
self.rankedwords = []
self.sentences = []
self.rankedsentences = []
self.xml = []
self.search = []
def log_combinations(self, x, y):
"""
Calculates the logarithm of a binomial coefficient.
This avoids overflows. Implemented with gamma functions for efficiency
"""
result = lgamma(x + 1)
result -= lgamma(y + 1)
result -= lgamma(x - y + 1)
return result
def marginal_prob(self, m, n):
"""
Calculates the Marginal Probability in the Analytic Entropy calculation
"""
result = self.log_combinations(n, m)
result += self.log_combinations(self.nwords - n, self.wordsperpart - m)
result -= self.log_combinations(self.nwords, self.wordsperpart)
prob = exp(result)
return prob
def analytic_entropy(self, n):
"""
Calculates the Analytic entropy of a word that occurs randomly n times
in the text
"""
result = 0.0
upperbound = min((n, self.wordsperpart))
for m in range(1, upperbound + 1):
x = float(m) / n
result -= self.marginal_prob(m, n) * x * log(x, 2)
result *= self.parts
return result
def entropy(self, word):
"""
Calculates the entropy of a word
"""
n = self.text.count(word)
H = 0.0
for i in range(self.parts):
nj = float(self.text[i * self.wordsperpart:(i + 1) * \
self.wordsperpart].count(word))
x = nj / n
if x > 0:
H -= x * log(x, 2)
result = self.analytic_entropy(n) - H
return (result, n * result / self.nwords)
def set_text(self, data):
"""
Loads the text to be analysed
"""
self.rawtext = u''
if type(data).__name__ == 'str' or type(data).__name__ == 'unicode':
self.rawtext = re.sub(u'\xa3', '&pound;', data)
else:
for line in data:
self.rawtext += re.sub(u'\xa3', '&pound;', line)
self.cleantext = EntropyCalculator.strip_times.sub('', \
EntropyCalculator.strip_xml.sub(u'', self.rawtext))
self.text = EntropyCalculator.split_words.split(self.cleantext)
self.sentences = EntropyCalculator.split_sentences.split(\
self.cleantext)
for i in range(len(self.sentences)):
if self.sentences[i] == ' ':
self.sentences[i] += '.'
else:
place = self.cleantext.find(self.sentences[i]) + \
len(self.sentences[i])
if place < len(self.cleantext):
self.sentences[i] += self.cleantext[place] + ' '
self.xml = EntropyCalculator.split_xml.split(self.rawtext)
for i in range(len(self.xml)):
self.xml[i] += '</p>'
self.search = [EntropyCalculator.strip_times.sub('', \
EntropyCalculator.strip_xml.sub('', line)) for line in self.xml]
while self.text[-1] == u'':
self.text.pop()
self.nwords = len(self.text)
self.wordsperpart = self.nwords / self.parts
if self.nwords % self.parts != 0:
self.wordsperpart += 1
self.unique = set(self.text)
def set_words_per_part(self, block_size):
"""
Changes the block size
"""
self.wordsperpart = block_size
self.parts = self.nwords / self.wordsperpart
if self.nwords % self.wordsperpart > 0:
self.parts += 1
def set_parts(self, p):
u"""Changes the number of blocks"""
self.parts = p
self.wordsperpart = self.nwords / self.parts
if self.nwords % self.parts != 0:
self.wordsperpart += 1
def analyse_text(self):
"""
Calculates the entropy for each unique word in the text
"""
self.rankedwords = [(self.entropy(word), word) for word in self.unique]
self.rankedwords.sort()
self.rankedwords.reverse()
self.totalentropy = sum([item[0][1] for item in self.rankedwords])
def analyse_sentences(self):
entropybyword = dict([(item[1], item[0][0]) for item in \
self.rankedwords])
self.rankedsentences = []
for sentence in self.sentences:
words = EntropyCalculator.split_words.split(sentence)
nwords = len(words)
total = 0.0
for word in words:
if word in entropybyword:
total += entropybyword[word]
self.rankedsentences.append((total, sentence))
self.rankedsentences.sort()
self.rankedsentences.reverse()
def filter_words(self, filter_type=None, threshold=0.0):
"""
Selects a subset of the words according to a criterion
"""
result = []
if filter_type == u'Entropy':
result = [item for item in self.rankedwords if item[0][0]\
>= threshold]
elif filter_type == u'Number':
result = self.rankedwords[:threshold]
elif filter_type == u'Percentile':
N = int((self.nwords * threshold / 100) + 0.5)
result = self.rankedwords[:N]
elif filter_type == u'Cumulative':
runningtotal = 0.0
i = 0
while runningtotal < threshold:
runningtotal += self.rankedwords[i][0][1]
if runningtotal < threshold:
result.append(self.rankedwords[i])
i += 1
elif filter_type == u'ProportionOfEntropy':
x = threshold * self.totalentropy
result = self.filter_words(u'Cumulative', x)
else:
result = self.rankedwords
return result
def output_words(self, ofile, filter_type=None, threshold=0.0):
"""
Outputs a set of selected keywords and their scores to a file
"""
keywords = self.filter_words(filter_type, threshold)
ofile.write(u'Word,Entropy,Proportion of document entropy\n')
for ((entropy, contrib), word) in keywords:
ofile.write(word + u',' + unicode(entropy) + ',' + \
unicode(contrib) + u'\n')
def keyword_dictionary(self):
"""
Outputs a dictionary of keywords and their entropies
"""
self.analyse_text()
return dict([(word, entropy[0]) for (entropy, word) in \
self.rankedwords if entropy[0] > 0.0])
if __name__ == "__main__":
import sys
H = EntropyCalculator(10)
data = open(sys.argv[1], 'r')
H.set_text(data)
H.set_words_per_part(1000)
H.analyse_text()
output = open(sys.argv[2], 'w')
H.output_words(output)
data.close()
output.close()
print sys.argv[1],"contains", H.nwords
print "Analysed with", H.parts, "blocks"
print "Total entropy", H.totalentropy, "bits"
print "Words with entropy >", float(H.parts) / (H.parts + 1), "are likely to be particulary significant"
print "Detailed analysis written to ",sys.argv[2]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment