Skip to content

Instantly share code, notes, and snippets.

@y3nr1ng
Created May 19, 2016 16:15
Show Gist options
  • Save y3nr1ng/7b432240ce230ee6982a3d890df0d3b8 to your computer and use it in GitHub Desktop.
Save y3nr1ng/7b432240ce230ee6982a3d890df0d3b8 to your computer and use it in GitHub Desktop.
Task 1
from multiprocessing import Pool, Manager
import yaml
import re
import math
from count import Count
from statistic import MI, LogLikelihood
import jieba
n_workers = 16
def print_top10(statistic):
top10 = statistic.get_top_10()
for i in range(40):
print( "emotion", i )
for w_mi in top10[i]:
print( w_mi )
def count_scores_of_sentence(statistic, sentence ):
segments = sentence.split(' ')
scores_of_sentence = [0] * 40
for seg in segments:
if seg not in statistic.scores_of_word:
continue
scores_of_word = statistic.scores_of_word[seg]
for i in range(40):
scores_of_sentence[i] += scores_of_word[i]
return scores_of_sentence
def count_accurate(count, statistic):
hit = [0, 0, 0]
for row in count.chunked_rows:
try:
emoticon = int(row.split('\t')[1]) - 1
scores_of_sentence = count_scores_of_sentence(statistic, row.split('\t')[2])
top3 = sorted(range(40), key=lambda i: scores_of_sentence[i])[:-4:-1]
for i in range(3):
if top3[i] == emoticon:
hit[i] += 1
except Exception as err:
print( "err:", err, row )
print( "Number of hits: %d %d %d" % (hit[0], hit[1], hit[2]))
def predict(statistic):
test_file = open('test.tsv', 'r')
output_file = open('output.txt', 'w')
rows = test_file.read().split('\n')
output_file.write("Id,Emoticon\n")
for row in rows:
try:
number = row.split('\t')[0]
sentence = row.split('\t')[2]
segs = jieba.cut(sentence, cut_all=False)
scores = [0] * 40
for seg in segs:
if seg not in statistic.scores_of_word:
continue
for i in range(40):
scores[i] += statistic.scores_of_word[seg][i]
top3 = sorted(range(40), key=lambda i: scores[i])[:-4:-1]
output_file.write(str(number) + "," + str(top3[0]) + " " + str(top3[1]) + " " + str(top3[2]) + "\n" )
except Exception as err:
print( row )
print( "err:", err )
output_file.close()
def main():
jieba.load_userdict("dict.expand")
count = Count()
mi = MI(count)
print( "Total %d rows." % len(count.chunked_rows) )
predict(mi)
count_accurate(count, mi)
# log_likelihood = LogLikelihood(count)
# count_accurate(count, log_likelihood )
# print_top10(log_likelihood)
if __name__ == "__main__":
main()
import math
from multiprocessing import Pool, Manager
class Statistic:
"""Base class for task1"""
statistic = {}
scores_of_word = dict()
_word_list = list()
def __init__(self, count, n_workers=16 ):
self.count = count
self._word_list = list(count.count_of_word.keys())
# assign jobs to each worker
pool = Pool(processes=n_workers)
results = []
n_words = len(self._word_list)
for i in range(0, n_workers):
start = i * int(n_words / n_workers)
end = min( start + int(n_words / n_workers), n_words )
# start = 0
# end = 10
results.append(pool.apply_async(self._worker, (start, end)))
pool.close()
pool.join()
# merge result from different worker
scores_of_word = dict()
for res in results:
sub_scores_of_word = res.get()
self.scores_of_word.update(sub_scores_of_word)
print("finish calculating", self.__class__.__name__ )
def _worker(self, start, end):
scores_of_word = dict()
for i in range(start, end):
self._calc(self._word_list[i], scores_of_word )
return scores_of_word
def _calc(self, w, scores_of_word):
raise NotImplementedError("task1._calc not implemented!")
def get_top_10(self, reverse=False):
sorted_words_of_emotion = list([] for i in range(40))
for i in range(40):
sorted_words = sorted(self.scores_of_word.items(), key = lambda x: x[1][i], reverse=reverse )
for j in range(10):
sorted_words_of_emotion[i].append( (sorted_words[-1 -j][0], sorted_words[-1 - j][1][i]) )
return sorted_words_of_emotion
class MI(Statistic):
def _calc(self, w, scores_of_word):
w_and_emotion = self.count.count_of_word_of_emotion[w]
w_count = self.count.count_of_word[w]
if w not in scores_of_word:
scores_of_word[w] = [0] * 40
#iterate for all emotion
for i in range(0, 40):
c_xy = w_and_emotion[i]
if c_xy == 0:
continue
N = self.count.total_word_count
p_xy = w_and_emotion[i] / N
p_x = w_count / N
p_y = self.count.count_of_emotion[i] / N
mi = 0
try:
mi = (math.log(p_xy) - math.log(p_x) - math.log(p_y)) * math.log(c_xy)
except Exception as err:
print( "p_xy = ", p_xy, " p_x = ", p_x, " p_y = ", p_y, "mi = ", mi )
print(err)
scores_of_word[w][i] = mi
class MIRaw(Statistic):
def _calc(self, w, scores_of_word):
w_and_emotion = self.count.count_of_word_of_emotion[w]
w_count = self.count.count_of_word[w]
if w not in scores_of_word:
scores_of_word[w] = [0] * 40
#iterate for all emotion
for i in range(0, 40):
c_xy = w_and_emotion[i]
if c_xy == 0:
continue
N = self.count.total_word_count
p_xy = w_and_emotion[i] / N
p_x = w_count / N
p_y = self.count.count_of_emotion[i] / N
mi = 0
try:
mi = (math.log(p_xy) - math.log(p_x) - math.log(p_y)) * c_xy
except Exception as err:
print( "p_xy = ", p_xy, " p_x = ", p_x, " p_y = ", p_y, "mi = ", mi )
print(err)
scores_of_word[w][i] = mi
class LogLikelihood(Statistic):
def _log_l(self, k, n, x):
try:
r = k * math.log(x) + (n - k) * math.log(1 - x)
except:
print( "k = ", k, "n = ", n, "x = ", x )
return r
def _calc(self, w, score_of_word):
#iterate for all emotion
if w not in score_of_word:
score_of_word[w] = [0] * 40
for i in range(0, 40):
c_12 = self.count.count_of_word_of_emotion[w][i]
c_1 = self.count.count_of_word[w]
c_2 = self.count.count_of_emotion[i]
N = self.count.total_word_count
p_1 = c_12 / c_1
p_2 = (c_2 - c_12) / (N - c_1)
p = c_2 / N
if ( p_1 == 0 ) or ( p_1 == 1) or ( p_2 == 0 ) or ( p_2 == 1 ):
continue
try:
l = self._log_l(c_12, c_1, p) + self._log_l(c_2 - c_12, N - c_1, p ) - self._log_l(c_12, c_1, p_1) - self._log_l(c_2 - c_12, N - c_1, p_2)
score_of_word[w][i] = - l
except Exception as err:
print( err )
print( "i =", i, "c_1 = ", c_1, "c_2 = ", c_2, "c_12 = ", c_12, "N = ", N, "p = ", p, "p_1 = ", p_1, "p_2 = ", p_2 )
class ChiSquare(Statistic):
def _calc(self, w, score_of_word):
if w not in score_of_word:
score_of_word[w] = [0] * 40
w_count = self.count.count_of_word[w]
emotion_count = self.count.count_of_emotion
w_and_emotion = self.count.count_of_word_of_emotion[w]
for i in range(0, 40):
nw_and_nemotioni = self.count.total_word_count - w_count - emotion_count[i] + w_and_emotion[i]
nw_and_emotioni = emotion_count[i] - w_and_emotion[i]
w_and_nemotioni = w_count - w_and_emotion[i]
w_and_emotioni = w_and_emotion[i]
c = self.count.total_word_count * (w_and_emotioni * nw_and_nemotioni - nw_and_emotioni * w_and_nemotioni)**2 / \
( (w_and_emotioni + w_and_nemotioni) * (nw_and_nemotioni + w_and_nemotioni) * \
(w_and_emotioni + nw_and_emotioni) * (nw_and_nemotioni + nw_and_emotioni) )
score_of_word[w][i] = c
from multiprocessing import Pool, Manager
import yaml
import re
import math
from count import Count
from statistic import MI, LogLikelihood, MIRaw, ChiSquare
import jieba
n_workers = 16
def write_top10(statistic):
print("start writing result of", statistic.__class__.__name__)
f = open(statistic.__class__.__name__ + "-result.txt", 'w')
top10 = statistic.get_top_10()
for i in range(40):
# f.write("EMOTICON" + str(i + 1) + "\t")
for w_mi in top10[i]:
f.write( str(w_mi[0]) )
f.write("\t")
f.write("\n")
# f.write("Score\t")
# for w_mi in top10[i]:
# f.write( "%.2f0" % w_mi[1])
# f.write("\t")
# f.write("\n")
# f.write("\n")
print("finish writing result of", statistic.__class__.__name__)
def main():
jieba.load_userdict("dict.expand")
count = Count()
print( "Total %d rows." % len(count.chunked_rows) )
mi = MI(count)
write_top10(mi)
mi_raw = MIRaw(count)
write_top10(mi_raw)
log_likelihood = LogLikelihood(count)
write_top10(log_likelihood)
chi = ChiSquare(count)
write_top10(chi)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment