Last active
May 20, 2020 11:21
-
-
Save g-leech/50da3943ab5b4dbf71e77524ffb23148 to your computer and use it in GitHub Desktop.
NLP helpers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#%tensorflow_version 2.x | |
import pandas as pd | |
import numpy as np | |
import re | |
from nltk import word_tokenize | |
from nltk.stem import WordNetLemmatizer | |
from scipy.sparse import hstack | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
from sklearn.pipeline import make_pipeline | |
from sklearn.linear_model import SGDRegressor, LinearRegression, BayesianRidge | |
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor | |
from sklearn.svm import SVR | |
from sklearn.model_selection import RandomizedSearchCV | |
from sklearn.metrics import mean_squared_error, make_scorer | |
from scipy.stats import uniform | |
from sklearn.model_selection import cross_val_score | |
def check_gpu() : | |
import tensorflow as tf | |
device_name = tf.test.gpu_device_name() | |
if device_name != '/device:GPU:0': | |
raise SystemError('GPU device not found') | |
print('Found GPU at: {}'.format(device_name)) | |
# Hossein's dumb baseline | |
def score_task_1(truth_loc, prediction_loc): | |
truth = pd.read_csv(truth_loc, usecols=['id','meanGrade']) | |
pred = pd.read_csv(prediction_loc, usecols=['id','pred']) | |
assert(sorted(truth.id) == sorted(pred.id)),"ID mismatch between ground truth and prediction!" | |
data = pd.merge(truth,pred) | |
rmse = np.sqrt(np.mean((data['meanGrade'] - data['pred'])**2)) | |
return rmse | |
def rmse(labels, preds): | |
return np.sqrt(np.mean((labels - preds)**2)) | |
def get_ngram(n) : | |
return (1, n) | |
def swop_tags(X): | |
plugin = lambda x: re.sub(r"<.+/>", x["edit"], x["original"]) | |
return X.apply(plugin, axis=1) | |
def clean_tags(X): | |
capture = r"<(.+)/>" | |
return X.str.replace(capture, "\g<1>") | |
def stack_both_headlines(X, n, stop_words, vocab=None, edit_vocab=None) : | |
X, vocabulary = bag_o_words(X, n=n, stops=stop_words) | |
X_edit, edit_vocabulary = bag_o_words(X, n=n, \ | |
stops=stop_words, \ | |
colName="edited") | |
if not vocab and not edit_vocab : | |
return hstack((X, X_edit)), vocabulary, edit_vocabulary | |
else : | |
return hstack((X, X_edit)) | |
def bag_o_words(X, | |
n=2, | |
stops=None, | |
colName="original", | |
vocab=None) : | |
args = {'strip_accents' : "ascii", | |
'lowercase' : True, | |
'stop_words' : stops, | |
'ngram_range' : get_ngram(n), | |
'min_df':2 | |
} | |
if vocab : | |
args['vocabulary'] = vocab | |
vect = CountVectorizer(**args) | |
print(vect) | |
return vect.fit_transform(X[colName]), \ | |
vect.get_feature_names() | |
def tf_idf(X, n=2, stops=None, colName="original", vocab=None) : | |
tfer = TfidfVectorizer( | |
tokenizer=LemmaTokenizer(), | |
strip_accents="ascii", | |
lowercase=True, | |
stop_words=stops, | |
ngram_range=get_ngram(n), | |
min_df=2, | |
vocabulary=vocab | |
) | |
return tfer.fit_transform(X[colName]), \ | |
tfer.get_feature_names() | |
# Copied from | |
# https://scikit-learn.org/stable/modules/feature_extraction.html | |
class LemmaTokenizer: | |
def __init__(self): | |
self.wnl = WordNetLemmatizer() | |
def __call__(self, doc): | |
return [self.wnl.lemmatize(t) \ | |
for t in word_tokenize(doc)] | |
def test_some_model(X_train, y_train, X_test, y_test) : | |
s = SGDRegressor() | |
s.fit(X_train, y_train) | |
return rmse(y_test, np.array(s.predict(X_test))) | |
def fit_all_models(models, X, y) : | |
return [ model.fit(X, y) \ | |
for model in models ] | |
def get_best(results) : | |
bestRmse = min(results.values()) | |
indexOfBest = list(results.values()).index(bestRmse) | |
model = list(results.keys())[indexOfBest] | |
return model, bestRmse | |
# Define the hyperparameter ranges for each model | |
def lookup_model_hypers(model) : | |
d = { | |
SGDRegressor().__class__ : dict(alpha=np.arange(0,0.5, 0.01), \ | |
penalty=['l2', 'l1']), | |
LinearRegression().__class__ : dict(), | |
BayesianRidge().__class__ : dict(alpha_1=np.arange(0,10,0.1) , \ | |
alpha_2=np.arange(0,10,0.1), \ | |
lambda_1=np.arange(0,10,0.1), \ | |
lambda_2=np.arange(0,10,0.1), \ | |
), | |
RandomForestRegressor().__class__ : dict(n_estimators=np.arange(20,300,40), \ | |
max_depth=np.arange(1,7,1), \ | |
min_samples_leaf=np.arange(2,20,4)), | |
GradientBoostingRegressor().__class__ : dict(n_estimators=np.arange(20,300,40), \ | |
max_depth=np.arange(1,7,1), \ | |
min_samples_leaf=np.arange(2,20,4)), | |
SVR().__class__ : dict(kernel=['rbf', 'sigmoid'], \ | |
gamma=['scale', 'auto'], \ | |
C=np.arange(0, 2, 0.1)) | |
} | |
return d[model.__class__] | |
# Randomised beats gridsearch | |
def hyperparam_search(model, X, y) : | |
distributions = lookup_model_hypers(model) | |
search = RandomizedSearchCV(model, \ | |
param_distributions=distributions, \ | |
n_jobs=-1) | |
return search.fit(X, y) \ | |
.best_params_ | |
def apply_best_params(model, param_dict) : | |
cls = model.__class__ | |
return cls(**param_dict) | |
def find_best_params(models, X, y) : | |
best_params = { model : hyperparam_search(model, X, y) \ | |
for model in models } | |
return [ apply_best_params(m, best_params[m]) \ | |
for m in models ] | |
def test_models(models, X, y) : | |
preds_by_model = { m : m.predict(X) \ | |
for m in models } | |
return { m: rmse(y, p) \ | |
for m, p \ | |
in preds_by_model.items() } | |
def add_edit_index(X, editDelimiter="<") : | |
X["list"] = X.original.str.split() | |
idx_finder = lambda l: [idx for idx, el in enumerate(l) if '<' in el][0] | |
X['EditIndex'] = X["list"].apply(idx_finder) | |
X['Length'] = pd.Series([len(x) for x in X.original.str.split()]) | |
X["EditProportion"] = X['EditIndex'] / X['Length'] | |
return X | |
def pair_up_vectors(X, model, stops) : | |
X_vector = vectorise(X, model, stops, "original") | |
X_edit = vectorise(X, model, stops, "edited") | |
return [np.concatenate((X_vector[i], X_edit[i]), axis=None) \ | |
for i in range(len(X_vector)) | |
] | |
# Silliest method of embedding headline: sentence as mean of word embedding | |
def sentence_embedding(model, sentence): | |
sentence = [word for word in sentence \ | |
if word in model.vocab] | |
return np.mean(model[sentence], axis=0) | |
def tokenise(headline, stops): | |
return [word for word in word_tokenize(headline.lower()) \ | |
if word not in stops] | |
def get_corpus(X, stops, col="original"): | |
l = lambda x : tokenise(x, stops) | |
return X[col].apply(l) | |
def vectorise(X, model, stops, col="original"): | |
corpus = get_corpus(X, stops, "original") | |
return np.array([ sentence_embedding(model, doc) \ | |
for doc in corpus ]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment