Created
June 4, 2021 07:57
-
-
Save kzinmr/93ca0c2046f169dc3cbc4da5f92b088b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
from collections import Counter | |
from dataclasses import dataclass | |
from itertools import tee, zip_longest | |
from typing import List | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import regex as re | |
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
def histogram(x): | |
x = np.array(x) | |
# x: array of 1-d scalars | |
print(f"{np.mean(x):.03}, {np.std(x):.03}, {np.median(x)}") | |
plt.hist(x, bins=100) | |
plt.axvline(x.mean(), color="r") | |
plt.axvline(np.median(x), color="y") | |
plt.show() | |
@dataclass | |
class PMIPhrase: | |
phrase: str | |
ppmi: float | |
raw_pmi: float | |
pa: float | |
pb: float | |
pab: float | |
na: int | |
nb: int | |
nab: int | |
nx: int | |
nxy: int | |
n_components: int | |
def ngrams(iterable, n=3, longest=False): | |
""" | |
>>> list(ngrams(range(5), 3, True)) | |
[(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, None), (4, None, None)] | |
>>> list(ngrams(range(5), 3, False)) | |
[(0, 1, 2), (1, 2, 3), (2, 3, 4)] | |
""" | |
ts = tee(iterable, n) | |
for i, t in enumerate(ts[1:]): | |
for _ in range(i + 1): | |
next(t, None) | |
if longest: | |
return zip_longest(*ts) | |
else: | |
return zip(*ts) | |
def ngram_counts(tokens, n, longest=False): | |
""" | |
Creates a frequency table mapping: ngrams -> count | |
""" | |
ngram2count = Counter(ngrams(tokens, n, longest)) | |
return ngram2count | |
class PMIPhraseBuilder: | |
""" | |
Recognize bigram phrases using bigram-PMI. | |
This is a Python port of Mikolov's original `word2phrase.c`. | |
Arguments: | |
documents : an iterator containing tokenized documents like: | |
['hello world', 'life is enjoyable'] | |
min_count : min number of mentions to keep each words | |
threshold : PMI threshold with which word pairs are marked as phrases. | |
sep: character or string as separator | |
""" | |
def __init__( | |
self, | |
documents, | |
min_unigram_count=50, | |
min_bigram_count=10, | |
threshold=100.0, | |
sep="_", | |
max_phrase_num=2, | |
): | |
self.min_unigram_count = min_unigram_count | |
self.min_bigram_count = min_bigram_count | |
self.pmi_threshold = threshold | |
self.pmi_lower_threshold = [6, 8, 12] | |
self.pmi_upper_threshold = [14, 16, 16] | |
self.pmi_phrases = {} | |
self.saved_counts = [] | |
self.max_phrase_num = max_phrase_num | |
self.phrase_separator = sep | |
docs_updated = documents | |
# Phrase length N -> [1, log2(N)+1)) times iteration | |
for _ in range(int(math.ceil(math.log(max_phrase_num, 2)))): | |
docs_updated = self.calc_pmi_phrases( | |
docs_updated, | |
sep, | |
is_splitted=True, | |
) | |
self.documents_with_phrases = docs_updated | |
def calc_pmi_phrases( | |
self, | |
documents, | |
sep="_", | |
is_splitted=True, | |
): | |
vocab_iter, document_iter = tee( | |
(line if is_splitted else line.split() for line in documents) | |
) | |
self.collect_bigram_pmi_stats(vocab_iter) | |
docs_with_phrases = self.calc_pmi_scores(document_iter, sep) | |
return docs_with_phrases | |
def collect_bigram_pmi_stats(self, sequences): | |
""" | |
Creates a frequency table mapping of unigrams and bigrams | |
""" | |
vocab = Counter() | |
non_empty_seqs = list(filter(None, sequences)) | |
uni2count = Counter( | |
[tok[0] for seq in non_empty_seqs for tok in ngram_counts(seq, 1)] | |
) | |
bi2count = Counter( | |
[ | |
tok | |
for seq in non_empty_seqs | |
for tok in ngram_counts(seq, 2, longest=True) | |
if tok[0] is not None and tok[1] is not None | |
] | |
) | |
self.unigram_count = uni2count | |
self.unigram_denominator = sum(uni2count.values()) | |
self.unigram_probability = { | |
k: v / self.unigram_denominator for k, v in uni2count.items() | |
} | |
self.bigram_count = bi2count | |
self.bigram_denominator = sum(bi2count.values()) | |
self.bigram_probability = { | |
k: v / self.bigram_denominator for k, v in bi2count.items() | |
} | |
self.bigram_pmi = { | |
(a, b): math.log( | |
v / (self.unigram_probability[a] * self.unigram_probability[b]), 2 | |
) | |
for (a, b), v in self.bigram_probability.items() | |
} | |
univocab = {k: v for k, v in uni2count.items() if v >= self.min_unigram_count} | |
bivocab = {k: v for k, v in bi2count.items() if v >= self.min_bigram_count} | |
# univocab.update(bivocab) | |
vocab = Counter(univocab) + Counter(bivocab) | |
self.vocab = vocab | |
self.total_n_words = sum(map(len, non_empty_seqs)) | |
bigram_ppmi_wolog = { | |
k: (nab - self.min_bigram_count + 1) | |
/ univocab[k[0]] | |
/ univocab[k[1]] | |
* self.total_n_words | |
for k, nab in bivocab.items() | |
if k[0] in univocab and k[1] in univocab | |
} | |
self.bigram_ppmi = { | |
k: math.log(v, 2) | |
for k, v in bigram_ppmi_wolog.items() | |
if math.log(v, 2) > 0 | |
} | |
self.saved_counts.append( | |
{ | |
"unigram_count": self.unigram_count, | |
"bigram_count": self.bigram_count, | |
"bigram_pmi": self.bigram_pmi, | |
"bigram_ppmi": self.bigram_ppmi, | |
"total_n_words": self.total_n_words, | |
"vocab": self.vocab, | |
} | |
) | |
print(f"#of Original Tokens: {self.total_n_words}") | |
print(f"#of Unigram Token Types: {len(uni2count)}") | |
print( | |
f"#of Unigram Token Types(freq >= {self.min_unigram_count}): {len(univocab)}" | |
) | |
print(f"#of Bigram Token Types: {len(bi2count)}") | |
print( | |
f"#of Bigram Token Types(freq >= {self.min_bigram_count}): {len(bivocab)}" | |
) | |
print(f"#of Bigram PPMI vocab: {len(self.bigram_ppmi)}") | |
def calc_pmi_scores(self, document_iter, sep="_", ix=0): | |
docs_with_phrases = [] | |
for line in document_iter: | |
doc_with_phrases = [] | |
pairs = ngrams(line, n=2, longest=True) | |
for pair in pairs: | |
if ( | |
pair in self.bigram_ppmi # self.vocab | |
and pair[0] in self.vocab | |
and pair[1] in self.vocab | |
): | |
na = self.unigram_count[pair[0]] | |
pa = self.unigram_probability[pair[0]] | |
nb = self.unigram_count[pair[1]] | |
pb = self.unigram_probability[pair[1]] | |
nab = self.bigram_count[pair] | |
pab = self.bigram_probability[pair] | |
ppmi = self.bigram_ppmi[pair] | |
raw_pmi = self.bigram_pmi[pair] | |
if ( | |
ppmi > self.pmi_lower_threshold[ix] | |
and ppmi < self.pmi_upper_threshold[ix] | |
): | |
# don't permit overlapping phrases | |
next(pairs) | |
phrase = sep.join(pair) | |
doc_with_phrases.append(phrase) | |
self.pmi_phrases[phrase] = PMIPhrase( | |
phrase, | |
ppmi, | |
raw_pmi, | |
pa, | |
pb, | |
pab, | |
na, | |
nb, | |
nab, | |
self.unigram_denominator, | |
self.bigram_denominator, | |
len(phrase.split(sep)), | |
) | |
else: | |
doc_with_phrases.append(pair[0]) | |
docs_with_phrases.append(doc_with_phrases) | |
return docs_with_phrases | |
def prepro(s): | |
return re.sub("_+", "", re.sub("\d+", "", re.sub(r"(\s)+", r"\g<1>", s.lower()))) | |
def get_ngram_tfidf_vector(corpus, n=2, max_df=1.0, min_df=2): | |
# corpus = list({doc for _, doc in docid2texts.items() if not re.search('\p{Han}+', doc)}) | |
vectorizer = TfidfVectorizer( | |
preprocessor=prepro, | |
ngram_range=(n, n), | |
max_df=max_df, | |
min_df=min_df, | |
sublinear_tf=False, | |
) | |
c_vectorizer = CountVectorizer( | |
preprocessor=prepro, | |
ngram_range=(n, n), | |
max_df=max_df, | |
min_df=min_df, | |
) | |
X = vectorizer.fit_transform(corpus) | |
C = c_vectorizer.fit_transform(corpus) | |
df_tfidf = pd.DataFrame(X.toarray()) | |
df_tfidf.columns = vectorizer.get_feature_names() | |
df_count = pd.DataFrame(C.toarray()) | |
df_count.columns = c_vectorizer.get_feature_names() | |
# .get_feature_names(): [feature_name] | |
# .vocabulary_: feature_name -> column index | |
ngram_idf = dict(zip(vectorizer.get_feature_names(), vectorizer.idf_)) | |
tfidf_sum_map = dict(df_tfidf.sum(axis=0).items()) | |
tfidf_mean_map = dict(df_tfidf.mean(axis=0).items()) | |
count_sum_map = dict(df_count.sum(axis=0).items()) | |
ngrams_tfidf_scored = [ | |
{ | |
"word": kw, | |
"n_words": n, | |
"count": count_sum_map[kw], | |
"idf": idf, | |
"tfidf_sum": tfidf_sum_map[kw], | |
"tfidf_mean": tfidf_mean_map[kw], | |
"score": np.log(count_sum_map[kw]) * idf, | |
} | |
for kw, idf in ngram_idf.items() | |
] | |
ngrams_tfidf_scored = sorted(ngrams_tfidf_scored, key=lambda x: -x["score"]) | |
return ngrams_tfidf_scored |
Author
kzinmr
commented
Jun 4, 2021
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment