|
import json |
|
import time |
|
import math |
|
import numpy as np |
|
|
|
|
|
def Feature_Select(frname, mode='chi', num=1000): # chi mi ig df ts |
|
all_text = [] |
|
label_set = set() # 'corpus_txt' |
|
with open(frname, 'r', encoding='utf-8') as fr: |
|
for line in fr: |
|
data = line.strip().split() |
|
all_text.append(data) |
|
label_set.add(data[0]) |
|
categories = list(sorted(label_set)) # 排序,确保每次生成的类别列表顺序唯一 |
|
cat_to_id = dict(zip(categories, range(len(categories)))) |
|
print("类别列表id:", cat_to_id) |
|
|
|
word_text_map = {} |
|
label_num = [0] * len(categories) |
|
for text in all_text: |
|
label_i = cat_to_id[text[0]] |
|
label_num[label_i] += 1 |
|
|
|
t = sorted(set(text[1:]), key=text[1:].index) # 去重,并保持原来顺序,不保持原序结果不一样 |
|
for w in t: |
|
if w not in word_text_map: |
|
word_text_map[w] = [0] * len(categories) |
|
word_text_map[w][label_i] += 1 |
|
else: |
|
word_text_map[w][label_i] += 1 |
|
|
|
N = len(all_text) |
|
print('文本总数量:', N) |
|
|
|
word_text_list = [] |
|
word_list = [] |
|
for k in word_text_map: |
|
word_list.append(k) |
|
word_text_list.append(word_text_map[k]) |
|
|
|
A_array = np.array(word_text_list, dtype=np.float) # A |
|
sign_array = np.sign(A_array) # 当A=0时,chi=0 |
|
word_num_each_label = np.sum(sign_array, 0) |
|
print('每个类别的词数量:', word_num_each_label) |
|
label_num = np.array(label_num, dtype=np.float) |
|
B_array = np.reshape(np.sum(word_text_list, 1), (-1, 1)) - A_array |
|
C_array = label_num - A_array |
|
D_array = N - A_array - B_array - C_array |
|
word_num = len(word_list) |
|
print('总词数:', word_num) |
|
|
|
|
|
def chi_label(): |
|
# N * (AD - BC)^2 / ((A+C)(B+D) (A+B)(C+D)) # 简化版 对于同一个类别:(AD - BC)^2 / ((A+B)(C+D)) |
|
chi_square_value = (A_array*D_array - B_array*C_array) ** 2 / ((A_array+B_array) * (C_array+D_array)) |
|
# chi_square_value = (A_array*D_array - B_array*C_array) ** 2 * N / ( |
|
# (A_array+C_array)*(B_array+D_array)*(A_array+B_array)*(C_array+D_array)) |
|
chi_square_value = sign_array * chi_square_value |
|
|
|
feature_word = {} |
|
for i in range(len(categories)): |
|
cur_chi = chi_square_value[:, i] |
|
order = np.argsort(cur_chi) # 返回排序后的索引,正向排序 |
|
# print(word_num_each_label[i]) |
|
min_n = min(num, int(word_num_each_label[i])) |
|
# top_80 = int(word_num_each_label[i] * 0.5) |
|
for i in order[-min_n:]: |
|
feature_word[i] = 1 |
|
chose_w = [word_list[i] for i in feature_word] |
|
print('提取的特征词的数量:', len(chose_w)) |
|
return chose_w |
|
|
|
|
|
def chi_mode(mode): |
|
# N * (AD - BC)^2 / ((A+C)(B+D) (A+B)(C+D)) |
|
chi_square_value = (A_array*D_array - B_array*C_array) ** 2 * N / ( |
|
(A_array+C_array)*(B_array+D_array)*(A_array+B_array)*(C_array+D_array)) |
|
chi_square_value = sign_array * chi_square_value |
|
|
|
if mode == 'max': chi_square_value = np.max(chi_square_value, axis=1).reshape((-1, 1)) |
|
elif mode == 'avg': chi_square_value = np.mean(chi_square_value, axis=1).reshape((-1, 1)) |
|
else: raise ValueError("参数有误") |
|
|
|
order = np.argsort(chi_square_value[:, 0]) # 返回排序后的索引,正向排序 |
|
chose_w = [word_list[i] for i in order[-num:]] |
|
return chose_w |
|
|
|
|
|
def mi_label(): |
|
# mi_value = np.log2(A_array * N / ((A_array+C_array)*(A_array+B_array))) # log ( (N * A) / ((A+C)(A+B)) ) |
|
mi_value = A_array * N / ((A_array+C_array)*(A_array+B_array)) # (N * A) / ((A+C)(A+B)) |
|
mi_value = sign_array * mi_value |
|
|
|
feature_word = {} |
|
for i in range(len(categories)): |
|
cur_chi = mi_value[:, i] |
|
order = np.argsort(cur_chi) # 返回排序后的下标索引,正向排序 |
|
for i in order[-num:]: |
|
feature_word[i] = 1 |
|
chose_w = [word_list[i] for i in feature_word] |
|
return chose_w |
|
|
|
|
|
def mi_mode(mode): |
|
mi_value = np.log2(A_array * N / ((A_array+C_array)*(A_array+B_array)) + 1e-15) # log ( (N * A) / ((A+C)(A+B)) ) |
|
mi_value = sign_array * mi_value |
|
|
|
if mode == 'max': mi_value = np.max(mi_value, axis=1).reshape((-1, 1)) |
|
elif mode == 'avg': mi_value = np.mean(mi_value, axis=1).reshape((-1, 1)) |
|
else: raise ValueError("参数有误") |
|
|
|
order = np.argsort(mi_value[:, 0]) # 返回排序后的索引,正向排序 |
|
chose_w = [word_list[i] for i in order[-num:]] |
|
return chose_w |
|
|
|
|
|
def ig(): |
|
P_c = label_num / N # 表示每个类文本在样本集中出现的概率 |
|
H_c = - np.sum(P_c * np.log2(P_c)).reshape((-1, 1)) # 熵 |
|
|
|
P_t1 = ((A_array + B_array) / N)[:, 0].reshape((-1, 1)) # 每一行值都是一样的,取其中任意一列 表示样本集中 包含词 t 的文本的概率 |
|
P_t0 = ((C_array + D_array) / N)[:, 0].reshape((-1, 1)) # (12879, 1) 表示样本集中 不包含词 t 的文本的概率 |
|
|
|
P_c_t1 = A_array / (A_array + B_array) # (12879, 10) 表示文本 包含词 t 时属于 c 的条件概率 |
|
P_c_t0 = C_array / (C_array + D_array) # 表示文本 不包含词 t 时属于 c 的条件概率 |
|
|
|
H_c_t = - P_t1 * np.sum(P_c_t1 * np.log2(P_c_t1 + 1e-15), 1).reshape((-1, 1)) - P_t0 * np.sum(P_c_t0 * np.log2(P_c_t0 + 1e-15), 1).reshape((-1, 1)) |
|
IG_t = H_c - H_c_t # IG(t) = H(c) - H(c|t) # (12879, 1) |
|
|
|
order = np.argsort(IG_t[:, 0]) # 返回排序后的索引,正向排序 |
|
chose_w = [word_list[i] for i in order[-num:]] |
|
return chose_w |
|
|
|
|
|
def df(): |
|
DF_t = A_array + B_array # 每一行值都是一样的,取其中任意一列 表示样本集中 包含词 t 的文本频次 |
|
order = np.argsort(DF_t[:, 0]) # 返回排序后的索引,正向排序 |
|
chose_w = [word_list[i] for i in order[-num:]] |
|
return chose_w |
|
|
|
|
|
def ts(): |
|
# 求和(A(A-1)) / 求和((A+C)(A+C-1)) |
|
ts_value = A_array * A_array # A_array * (A_array-1) |
|
ts_value = np.sum(ts_value, axis=1) |
|
order = np.argsort(ts_value) |
|
chose_w = [word_list[i] for i in order[-num:]] |
|
return chose_w |
|
|
|
|
|
if mode == 'chi_label': chose_w = chi_label() |
|
elif mode == 'chi_mode': chose_w = chi_mode('max') |
|
elif mode == 'mi_label': chose_w = mi_label() |
|
elif mode == 'mi_mode': chose_w = mi_mode('max') |
|
elif mode == 'ig': chose_w = ig() |
|
elif mode == 'df': chose_w = df() |
|
elif mode == 'ts': chose_w = ts() |
|
else: raise ValueError("参数有误") |
|
return chose_w |
|
|
|
|
|
def remove_w(feature_word, frname, fwname): |
|
with open(fwname, 'w', encoding='utf-8') as fw: |
|
with open(frname, 'r', encoding='utf-8') as fr: |
|
for line in fr: |
|
data = line.strip().split() |
|
temp = [] |
|
for w in data[1:]: |
|
if w in feature_word: |
|
temp.append(w) |
|
if len(temp) != 0: |
|
fw.write(data[0] + '\t' + ' '.join(temp) + '\n') |
|
|
|
|
|
if __name__ == '__main__': |
|
start = time.clock() |
|
|
|
# print(sorted(Feature_Select('corpus_txt', 'chi', k))[:100]) |
|
# print(sorted(Feature_Select('corpus_txt', 'mi', k))[:100]) |
|
# print(sorted(Feature_Select('corpus_txt', 'ig', k))[:100]) |
|
# print(sorted(Feature_Select('corpus_txt', 'df', k))[:100]) |
|
|
|
|
|
# base_dir = 'D:\zzk\BigData\QA数据集\TREC/' |
|
# dest_dir = 'D:\zzk\BigData\QA数据集\TREC/trec_feature_data/' |
|
# # base_dir = 'G:\\4-文件\\实验室主机数据\\zzk\\BigData\\QA数据集\\TREC/' |
|
# # dest_dir = 'G:\\4-文件\\实验室主机数据\\zzk\\BigData\\QA数据集\\TREC\\trec_feature_data_new/' |
|
# train_set = 'nn_train_txt_low' |
|
# test_set = 'nn_test_txt_low' |
|
|
|
|
|
# i = 0 |
|
# for i in range(0, 11): |
|
# i += 1 |
|
# k = i * 200 |
|
|
|
# if i == 11: |
|
# k = 3000 |
|
|
|
# mode = 'mi' |
|
# w = Feature_Select(base_dir + train_set, mode, k) |
|
# print(len(w)) |
|
|
|
# pre = 'new_' + mode + '_' + str(k) + '_' |
|
|
|
# with open(dest_dir + pre + 'trec_feature_word', 'w', encoding='utf-8') as fw: |
|
# fw.write(json.dumps(w, ensure_ascii=False, indent=4) + '\n') |
|
|
|
# remove_w(w, base_dir + train_set, dest_dir + pre + train_set) |
|
# remove_w(w, base_dir + test_set, dest_dir + pre + test_set) |
|
|
|
|
|
# i = 0 |
|
# for i in range(9, 10): # 0, 8 |
|
# i += 1 |
|
# k = i * 1000 |
|
# mode = 'ts' |
|
# w = Feature_Select(base_dir + train_set, mode, k) |
|
# print(len(w)) |
|
|
|
# pre = mode + '_' + str(k) + '_' |
|
|
|
# with open(dest_dir + pre + 'trec_feature_word', 'w', encoding='utf-8') as fw: |
|
# fw.write(json.dumps(w, ensure_ascii=False, indent=4) + '\n') |
|
|
|
# remove_w(w, base_dir + train_set, dest_dir + pre + train_set) |
|
# remove_w(w, base_dir + test_set, dest_dir + pre + test_set) |
|
|
|
|
|
# base_dir = 'D:/zzk\MyGit/sudazzk/New_QA_question_set/question_set_corpus/HIT_IR-Lab/hit_data/' |
|
# dest_dir = 'D:/zzk\MyGit/sudazzk/New_QA_question_set/question_set_corpus/HIT_IR-Lab/hit_feature_data/' |
|
# # base_dir = 'G:\\4-文件\\实验室主机数据\\zzk\\BigData\\QA数据集\\TREC/' |
|
# # dest_dir = 'G:\\4-文件\\实验室主机数据\\zzk\\BigData\\QA数据集\\TREC\\trec_feature_data_new/' |
|
# train_set = 'train_seg_nn' |
|
# test_set = 'test_seg_nn' |
|
|
|
# i = 0 |
|
# for i in range(0, 7): # 0, 7 |
|
# i += 1 |
|
# # k = i * 1000 # 论文 |
|
# mode = 'chi' |
|
# w = Feature_Select(base_dir + train_set, mode, k) |
|
# print(k, len(w)) |
|
# pre = mode + '_' + str(k) + '_' |
|
# # with open(dest_dir + pre + 'hit_feature_word', 'w', encoding='utf-8') as fw: |
|
# # fw.write(json.dumps(w, ensure_ascii=False, indent=4) + '\n') |
|
# remove_w(w, base_dir + train_set, dest_dir + pre + train_set) |
|
# remove_w(w, base_dir + test_set, dest_dir + pre + test_set) |
|
|
|
|
|
# k = 2000 # 按类别提取 |
|
# mode = 'mi_label' |
|
# w = Feature_Select(base_dir + train_set, mode, k) |
|
# print(k, len(w)) |
|
# pre = mode + '_' + str(k) + '_' |
|
# remove_w(w, base_dir + train_set, dest_dir + pre + train_set) |
|
# remove_w(w, base_dir + test_set, dest_dir + pre + test_set) |
|
|
|
|
|
|
|
k = 1000 # 按类别提取 |
|
mode = 'chi_label' |
|
w = Feature_Select('D:\data\中移在线-CCL-意图分类任务-初赛训练数据\call_reason_train_txt_seg', mode, k) |
|
print(k, len(w)) |
|
# pre = mode + '_' + str(k) + '_' |
|
|
|
remove_w(w, 'D:\data\中移在线-CCL-意图分类任务-初赛训练数据\call_reason_train_txt_seg', 'D:\data\中移在线-CCL-意图分类任务-初赛训练数据\call_reason_train_txt_seg_clean') |
|
# remove_w(w, base_dir + test_set, dest_dir + pre + test_set) |
|
|
|
|
|
pass |
|
|
|
end = time.clock() |
|
print('运行时间为:%.3f 秒' % (end-start)) |
|
|