Created
May 19, 2016 16:15
-
-
Save y3nr1ng/7b432240ce230ee6982a3d890df0d3b8 to your computer and use it in GitHub Desktop.
Task 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool, Manager | |
import yaml | |
import re | |
import math | |
from count import Count | |
from statistic import MI, LogLikelihood | |
import jieba | |
n_workers = 16 | |
def print_top10(statistic): | |
top10 = statistic.get_top_10() | |
for i in range(40): | |
print( "emotion", i ) | |
for w_mi in top10[i]: | |
print( w_mi ) | |
def count_scores_of_sentence(statistic, sentence ): | |
segments = sentence.split(' ') | |
scores_of_sentence = [0] * 40 | |
for seg in segments: | |
if seg not in statistic.scores_of_word: | |
continue | |
scores_of_word = statistic.scores_of_word[seg] | |
for i in range(40): | |
scores_of_sentence[i] += scores_of_word[i] | |
return scores_of_sentence | |
def count_accurate(count, statistic): | |
hit = [0, 0, 0] | |
for row in count.chunked_rows: | |
try: | |
emoticon = int(row.split('\t')[1]) - 1 | |
scores_of_sentence = count_scores_of_sentence(statistic, row.split('\t')[2]) | |
top3 = sorted(range(40), key=lambda i: scores_of_sentence[i])[:-4:-1] | |
for i in range(3): | |
if top3[i] == emoticon: | |
hit[i] += 1 | |
except Exception as err: | |
print( "err:", err, row ) | |
print( "Number of hits: %d %d %d" % (hit[0], hit[1], hit[2])) | |
def predict(statistic): | |
test_file = open('test.tsv', 'r') | |
output_file = open('output.txt', 'w') | |
rows = test_file.read().split('\n') | |
output_file.write("Id,Emoticon\n") | |
for row in rows: | |
try: | |
number = row.split('\t')[0] | |
sentence = row.split('\t')[2] | |
segs = jieba.cut(sentence, cut_all=False) | |
scores = [0] * 40 | |
for seg in segs: | |
if seg not in statistic.scores_of_word: | |
continue | |
for i in range(40): | |
scores[i] += statistic.scores_of_word[seg][i] | |
top3 = sorted(range(40), key=lambda i: scores[i])[:-4:-1] | |
output_file.write(str(number) + "," + str(top3[0]) + " " + str(top3[1]) + " " + str(top3[2]) + "\n" ) | |
except Exception as err: | |
print( row ) | |
print( "err:", err ) | |
output_file.close() | |
def main(): | |
jieba.load_userdict("dict.expand") | |
count = Count() | |
mi = MI(count) | |
print( "Total %d rows." % len(count.chunked_rows) ) | |
predict(mi) | |
count_accurate(count, mi) | |
# log_likelihood = LogLikelihood(count) | |
# count_accurate(count, log_likelihood ) | |
# print_top10(log_likelihood) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from multiprocessing import Pool, Manager | |
class Statistic: | |
"""Base class for task1""" | |
statistic = {} | |
scores_of_word = dict() | |
_word_list = list() | |
def __init__(self, count, n_workers=16 ): | |
self.count = count | |
self._word_list = list(count.count_of_word.keys()) | |
# assign jobs to each worker | |
pool = Pool(processes=n_workers) | |
results = [] | |
n_words = len(self._word_list) | |
for i in range(0, n_workers): | |
start = i * int(n_words / n_workers) | |
end = min( start + int(n_words / n_workers), n_words ) | |
# start = 0 | |
# end = 10 | |
results.append(pool.apply_async(self._worker, (start, end))) | |
pool.close() | |
pool.join() | |
# merge result from different worker | |
scores_of_word = dict() | |
for res in results: | |
sub_scores_of_word = res.get() | |
self.scores_of_word.update(sub_scores_of_word) | |
print("finish calculating", self.__class__.__name__ ) | |
def _worker(self, start, end): | |
scores_of_word = dict() | |
for i in range(start, end): | |
self._calc(self._word_list[i], scores_of_word ) | |
return scores_of_word | |
def _calc(self, w, scores_of_word): | |
raise NotImplementedError("task1._calc not implemented!") | |
def get_top_10(self, reverse=False): | |
sorted_words_of_emotion = list([] for i in range(40)) | |
for i in range(40): | |
sorted_words = sorted(self.scores_of_word.items(), key = lambda x: x[1][i], reverse=reverse ) | |
for j in range(10): | |
sorted_words_of_emotion[i].append( (sorted_words[-1 -j][0], sorted_words[-1 - j][1][i]) ) | |
return sorted_words_of_emotion | |
class MI(Statistic): | |
def _calc(self, w, scores_of_word): | |
w_and_emotion = self.count.count_of_word_of_emotion[w] | |
w_count = self.count.count_of_word[w] | |
if w not in scores_of_word: | |
scores_of_word[w] = [0] * 40 | |
#iterate for all emotion | |
for i in range(0, 40): | |
c_xy = w_and_emotion[i] | |
if c_xy == 0: | |
continue | |
N = self.count.total_word_count | |
p_xy = w_and_emotion[i] / N | |
p_x = w_count / N | |
p_y = self.count.count_of_emotion[i] / N | |
mi = 0 | |
try: | |
mi = (math.log(p_xy) - math.log(p_x) - math.log(p_y)) * math.log(c_xy) | |
except Exception as err: | |
print( "p_xy = ", p_xy, " p_x = ", p_x, " p_y = ", p_y, "mi = ", mi ) | |
print(err) | |
scores_of_word[w][i] = mi | |
class MIRaw(Statistic): | |
def _calc(self, w, scores_of_word): | |
w_and_emotion = self.count.count_of_word_of_emotion[w] | |
w_count = self.count.count_of_word[w] | |
if w not in scores_of_word: | |
scores_of_word[w] = [0] * 40 | |
#iterate for all emotion | |
for i in range(0, 40): | |
c_xy = w_and_emotion[i] | |
if c_xy == 0: | |
continue | |
N = self.count.total_word_count | |
p_xy = w_and_emotion[i] / N | |
p_x = w_count / N | |
p_y = self.count.count_of_emotion[i] / N | |
mi = 0 | |
try: | |
mi = (math.log(p_xy) - math.log(p_x) - math.log(p_y)) * c_xy | |
except Exception as err: | |
print( "p_xy = ", p_xy, " p_x = ", p_x, " p_y = ", p_y, "mi = ", mi ) | |
print(err) | |
scores_of_word[w][i] = mi | |
class LogLikelihood(Statistic): | |
def _log_l(self, k, n, x): | |
try: | |
r = k * math.log(x) + (n - k) * math.log(1 - x) | |
except: | |
print( "k = ", k, "n = ", n, "x = ", x ) | |
return r | |
def _calc(self, w, score_of_word): | |
#iterate for all emotion | |
if w not in score_of_word: | |
score_of_word[w] = [0] * 40 | |
for i in range(0, 40): | |
c_12 = self.count.count_of_word_of_emotion[w][i] | |
c_1 = self.count.count_of_word[w] | |
c_2 = self.count.count_of_emotion[i] | |
N = self.count.total_word_count | |
p_1 = c_12 / c_1 | |
p_2 = (c_2 - c_12) / (N - c_1) | |
p = c_2 / N | |
if ( p_1 == 0 ) or ( p_1 == 1) or ( p_2 == 0 ) or ( p_2 == 1 ): | |
continue | |
try: | |
l = self._log_l(c_12, c_1, p) + self._log_l(c_2 - c_12, N - c_1, p ) - self._log_l(c_12, c_1, p_1) - self._log_l(c_2 - c_12, N - c_1, p_2) | |
score_of_word[w][i] = - l | |
except Exception as err: | |
print( err ) | |
print( "i =", i, "c_1 = ", c_1, "c_2 = ", c_2, "c_12 = ", c_12, "N = ", N, "p = ", p, "p_1 = ", p_1, "p_2 = ", p_2 ) | |
class ChiSquare(Statistic): | |
def _calc(self, w, score_of_word): | |
if w not in score_of_word: | |
score_of_word[w] = [0] * 40 | |
w_count = self.count.count_of_word[w] | |
emotion_count = self.count.count_of_emotion | |
w_and_emotion = self.count.count_of_word_of_emotion[w] | |
for i in range(0, 40): | |
nw_and_nemotioni = self.count.total_word_count - w_count - emotion_count[i] + w_and_emotion[i] | |
nw_and_emotioni = emotion_count[i] - w_and_emotion[i] | |
w_and_nemotioni = w_count - w_and_emotion[i] | |
w_and_emotioni = w_and_emotion[i] | |
c = self.count.total_word_count * (w_and_emotioni * nw_and_nemotioni - nw_and_emotioni * w_and_nemotioni)**2 / \ | |
( (w_and_emotioni + w_and_nemotioni) * (nw_and_nemotioni + w_and_nemotioni) * \ | |
(w_and_emotioni + nw_and_emotioni) * (nw_and_nemotioni + nw_and_emotioni) ) | |
score_of_word[w][i] = c |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool, Manager | |
import yaml | |
import re | |
import math | |
from count import Count | |
from statistic import MI, LogLikelihood, MIRaw, ChiSquare | |
import jieba | |
n_workers = 16 | |
def write_top10(statistic): | |
print("start writing result of", statistic.__class__.__name__) | |
f = open(statistic.__class__.__name__ + "-result.txt", 'w') | |
top10 = statistic.get_top_10() | |
for i in range(40): | |
# f.write("EMOTICON" + str(i + 1) + "\t") | |
for w_mi in top10[i]: | |
f.write( str(w_mi[0]) ) | |
f.write("\t") | |
f.write("\n") | |
# f.write("Score\t") | |
# for w_mi in top10[i]: | |
# f.write( "%.2f0" % w_mi[1]) | |
# f.write("\t") | |
# f.write("\n") | |
# f.write("\n") | |
print("finish writing result of", statistic.__class__.__name__) | |
def main(): | |
jieba.load_userdict("dict.expand") | |
count = Count() | |
print( "Total %d rows." % len(count.chunked_rows) ) | |
mi = MI(count) | |
write_top10(mi) | |
mi_raw = MIRaw(count) | |
write_top10(mi_raw) | |
log_likelihood = LogLikelihood(count) | |
write_top10(log_likelihood) | |
chi = ChiSquare(count) | |
write_top10(chi) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment