Created
January 21, 2016 11:04
-
-
Save karimkhanp/7f242ba143b245aa7bab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Using the words as features removing stopwords | |
""" | |
from sklearn.utils import check_random_state | |
from sklearn.datasets import load_files | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.feature_extraction.text import HashingVectorizer | |
from sklearn.naive_bayes import MultinomialNB | |
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn.metrics import accuracy_score, average_precision_score, f1_score, precision_score, recall_score | |
from sklearn.externals import joblib | |
from sklearn.feature_extraction.text import FeatureHasher | |
from nltk.corpus import stopwords | |
from nltk.stem.lancaster import LancasterStemmer | |
from nltk.tokenize import word_tokenize | |
import nltk | |
import numpy as np | |
from time import time | |
import pprint | |
import gearman | |
import nltk | |
import re | |
import os | |
import sys | |
i = 0 | |
""" | |
GetAnswerType class for getting the answer type | |
""" | |
class GetAnswerType(object): | |
""" | |
Init for GetAnswerType | |
""" | |
def __init__(self): | |
self.gm_worker = gearman.GearmanWorker(['localhost:4730']) | |
self.gm_worker.register_task('classify_less_class', self.testClassifier) | |
self.root_dir = os.getcwd() | |
self.trainClassifier() | |
""" | |
Function to fetch the data from cache | |
@cache <dict> consist of training data | |
""" | |
def fetch_data(self, cache, data_home=None, subset='train', categories=None, | |
shuffle=True, random_state=42): | |
if subset in ('train', 'test'): | |
data = cache[subset] | |
elif subset == 'all': | |
data_lst = list() | |
target = list() | |
filenames = list() | |
for subset in ('train', 'test'): | |
data = cache[subset] | |
data_lst.extend(data.data) | |
target.extend(data.target) | |
filenames.extend(data.filenames) | |
data.data = data_lst | |
data.target = np.array(target) | |
data.filenames = np.array(filenames) | |
data.description = 'the 20 newsgroups by date dataset' | |
else: | |
raise ValueError( | |
"subset can only be 'train', 'test' or 'all', got '%s'" % subset) | |
if categories is not None: | |
labels = [(data.target_names.index(cat), cat) for cat in categories] | |
# Sort the categories to have the ordering of the labels | |
labels.sort() | |
labels, categories = zip(*labels) | |
mask = np.in1d(data.target, labels) | |
data.filenames = data.filenames[mask] | |
data.target = data.target[mask] | |
# searchsorted to have continuous labels | |
data.target = np.searchsorted(labels, data.target) | |
data.target_names = list(categories) | |
# Use an object array to shuffle: avoids memory copy | |
data_lst = np.array(data.data, dtype=object) | |
data_lst = data_lst[mask] | |
data.data = data_lst.tolist() | |
if shuffle: | |
random_state = check_random_state(random_state) | |
indices = np.arange(data.target.shape[0]) | |
random_state.shuffle(indices) | |
data.filenames = data.filenames[indices] | |
data.target = data.target[indices] | |
# Use an object array to shuffle: avoids memory copy | |
data_lst = np.array(data.data, dtype=object) | |
data_lst = data_lst[indices] | |
data.data = data_lst.tolist() | |
return data | |
""" | |
For custom tokenizing the text | |
@text <type 'str'> text which needs to get tokenized | |
@return <type 'str'> tokens | |
""" | |
def token_ques(self, text): | |
things_to_replace = ['?'] | |
#wh_tags = ['WP','WRB','MD','WDT'] | |
things_to_replace += stopwords.words('english') | |
#wh_word = None | |
for tok in text.split('\n'): | |
original_query = tok | |
query_pos_tags = nltk.pos_tag(word_tokenize(tok)) | |
#for tag in query_pos_tags: | |
# if tag[1] in wh_tags: | |
# try: | |
# wh_word = tag[0] | |
# things_to_replace.remove(tag[0].lower()) | |
# except Exception: | |
# pass | |
#print things_to_replace | |
for word in things_to_replace: | |
tok = tok.lower() | |
tok = re.sub("\s"+word+"\s|\s?"+"\?"+"$",' ',tok) | |
tok = tok.strip(" ") | |
tok = tok.lstrip(" ") | |
tok = tok.rstrip(" ") | |
#print tok | |
#if wh_word: | |
# print wh_word | |
# yield wh_word.lower() | |
# wh_word = None | |
#yield original_query.lower() | |
#yield tok.lower() | |
for word in word_tokenize(tok): | |
yield word.lower() | |
""" | |
Train classifier | |
""" | |
def trainClassifier(self): | |
try: | |
t1 = time() | |
start_time = time() | |
#self.vectorizer = HashingVectorizer(non_negative=True) | |
#self.vectorizer = TfidfVectorizer(sublinear_tf=True, max_df=0.5, stop_words='english') | |
self.hasher = FeatureHasher(input_type='string',non_negative=True) | |
#self.clf = MultinomialNB(alpha=0.001) | |
from sklearn.svm import SVC | |
self.clf = SVC(probability=True,C=5., gamma=0.001) | |
data_folder = self.root_dir + "/trec_data" | |
train_dataset = load_files(data_folder) | |
print("Time taken to load the data=>", time()-start_time) | |
print("data loaded") | |
cache = dict(train=train_dataset) | |
self.data_train = self.fetch_data(cache, subset='train') | |
print "Updating the classifier" | |
training_data = [] | |
for text in self.data_train.data: | |
text = self.modifyQuery(text.decode('utf-8','ignore')) | |
training_data.append(text) | |
raw_X = (self.token_ques(text) for text in training_data) | |
#X_train = self.vectorizer.transform(training_data) | |
X_train = self.hasher.fit_transform(raw_X) | |
y_train = self.data_train.target | |
self.clf.fit(raw_X, y_train) | |
print("Classifier updated") | |
print("time taken=>", time()-t1) | |
except Exception: | |
import traceback | |
print traceback.format_exc() | |
""" | |
Function to extract NERs from the sentence | |
@query <type 'str'> sentence which needs to be processed | |
@return <type 'list'> list of noun entities | |
""" | |
def extractNEs(self, query): | |
query_tokens = word_tokenize(query) | |
query_pos_tags = nltk.pos_tag(query_tokens) | |
sentt = nltk.ne_chunk(query_pos_tags, binary = True) | |
Ne_list = [] | |
for subtree in sentt.subtrees(filter=lambda t: t.label() == 'NE'): | |
myNE = [] | |
for leave in subtree.leaves(): | |
myNE.append(leave[0]) | |
Ne_list.append(' '.join(myNE)) | |
return list(set(Ne_list)) | |
""" | |
Function to replace the noun entity with 'noun entity' keyword | |
@query <type 'str'> query | |
@return <type 'str'> modified query | |
""" | |
def modifyQuery(self, query): | |
Nes = self.extractNEs(query) | |
for ne in Nes: | |
query = query.replace(ne, 'noun entity') | |
return query | |
""" | |
Function to test classifier | |
""" | |
def testClassifier(self, gearman_worker, gearman_job): | |
try: | |
print("\n\nTesting the query ==> "+gearman_job.data) | |
query = gearman_job.data | |
result = {} | |
query = self.modifyQuery(query) | |
test_data = [query] | |
raw_X = (self.token_ques(text) for text in test_data) | |
X_test = self.hasher.fit_transform(raw_X) | |
#X_test = self.vectorizer.transform(test_data) | |
pred = self.clf.predict(raw_X) | |
print("pred=>", pred) | |
self.categories = self.data_train.target_names | |
for doc, category in zip(test_data, pred): | |
print('%r => %s' % (doc, self.categories[category])) | |
index = 1 | |
predict_prob = self.clf.predict_proba(X_test) | |
for doc, category_list in zip(test_data, predict_prob): | |
print('\n\n') | |
category_list = sorted(enumerate(category_list), key=lambda x:x[1], reverse=True) | |
i = 0 | |
for val in category_list: | |
print('%r => %s => %s' % (doc, self.categories[val[0]], str(val[1]))) | |
if float(val[1]) > float(0.05): | |
result[index] = {} | |
result[index][self.categories[val[0]]] = "%0.2f"%(float(val[1]) * 100)+"%" | |
index += 1 | |
elif float(val[1]) <= float(0.05): | |
result[index] = {} | |
result[index][self.categories[val[0]]] = "%0.2f"%(float(val[1]) * 100)+"%" | |
index += 1 | |
i +=1 | |
if i == 10: | |
break | |
except Exception: | |
import traceback | |
print traceback.format_exc() | |
import json | |
return json.dumps(result) | |
if __name__ == '__main__': | |
GetAnswerType().gm_worker.work() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment