Skip to content

Instantly share code, notes, and snippets.

@kzinmr
Created June 4, 2021 07:57
Show Gist options
  • Save kzinmr/93ca0c2046f169dc3cbc4da5f92b088b to your computer and use it in GitHub Desktop.
Save kzinmr/93ca0c2046f169dc3cbc4da5f92b088b to your computer and use it in GitHub Desktop.
import math
from collections import Counter
from dataclasses import dataclass
from itertools import tee, zip_longest
from typing import List
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import regex as re
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
def histogram(x):
x = np.array(x)
# x: array of 1-d scalars
print(f"{np.mean(x):.03}, {np.std(x):.03}, {np.median(x)}")
plt.hist(x, bins=100)
plt.axvline(x.mean(), color="r")
plt.axvline(np.median(x), color="y")
plt.show()
@dataclass
class PMIPhrase:
phrase: str
ppmi: float
raw_pmi: float
pa: float
pb: float
pab: float
na: int
nb: int
nab: int
nx: int
nxy: int
n_components: int
def ngrams(iterable, n=3, longest=False):
"""
>>> list(ngrams(range(5), 3, True))
[(0, 1, 2), (1, 2, 3), (2, 3, 4), (3, 4, None), (4, None, None)]
>>> list(ngrams(range(5), 3, False))
[(0, 1, 2), (1, 2, 3), (2, 3, 4)]
"""
ts = tee(iterable, n)
for i, t in enumerate(ts[1:]):
for _ in range(i + 1):
next(t, None)
if longest:
return zip_longest(*ts)
else:
return zip(*ts)
def ngram_counts(tokens, n, longest=False):
"""
Creates a frequency table mapping: ngrams -> count
"""
ngram2count = Counter(ngrams(tokens, n, longest))
return ngram2count
class PMIPhraseBuilder:
"""
Recognize bigram phrases using bigram-PMI.
This is a Python port of Mikolov's original `word2phrase.c`.
Arguments:
documents : an iterator containing tokenized documents like:
['hello world', 'life is enjoyable']
min_count : min number of mentions to keep each words
threshold : PMI threshold with which word pairs are marked as phrases.
sep: character or string as separator
"""
def __init__(
self,
documents,
min_unigram_count=50,
min_bigram_count=10,
threshold=100.0,
sep="_",
max_phrase_num=2,
):
self.min_unigram_count = min_unigram_count
self.min_bigram_count = min_bigram_count
self.pmi_threshold = threshold
self.pmi_lower_threshold = [6, 8, 12]
self.pmi_upper_threshold = [14, 16, 16]
self.pmi_phrases = {}
self.saved_counts = []
self.max_phrase_num = max_phrase_num
self.phrase_separator = sep
docs_updated = documents
# Phrase length N -> [1, log2(N)+1)) times iteration
for _ in range(int(math.ceil(math.log(max_phrase_num, 2)))):
docs_updated = self.calc_pmi_phrases(
docs_updated,
sep,
is_splitted=True,
)
self.documents_with_phrases = docs_updated
def calc_pmi_phrases(
self,
documents,
sep="_",
is_splitted=True,
):
vocab_iter, document_iter = tee(
(line if is_splitted else line.split() for line in documents)
)
self.collect_bigram_pmi_stats(vocab_iter)
docs_with_phrases = self.calc_pmi_scores(document_iter, sep)
return docs_with_phrases
def collect_bigram_pmi_stats(self, sequences):
"""
Creates a frequency table mapping of unigrams and bigrams
"""
vocab = Counter()
non_empty_seqs = list(filter(None, sequences))
uni2count = Counter(
[tok[0] for seq in non_empty_seqs for tok in ngram_counts(seq, 1)]
)
bi2count = Counter(
[
tok
for seq in non_empty_seqs
for tok in ngram_counts(seq, 2, longest=True)
if tok[0] is not None and tok[1] is not None
]
)
self.unigram_count = uni2count
self.unigram_denominator = sum(uni2count.values())
self.unigram_probability = {
k: v / self.unigram_denominator for k, v in uni2count.items()
}
self.bigram_count = bi2count
self.bigram_denominator = sum(bi2count.values())
self.bigram_probability = {
k: v / self.bigram_denominator for k, v in bi2count.items()
}
self.bigram_pmi = {
(a, b): math.log(
v / (self.unigram_probability[a] * self.unigram_probability[b]), 2
)
for (a, b), v in self.bigram_probability.items()
}
univocab = {k: v for k, v in uni2count.items() if v >= self.min_unigram_count}
bivocab = {k: v for k, v in bi2count.items() if v >= self.min_bigram_count}
# univocab.update(bivocab)
vocab = Counter(univocab) + Counter(bivocab)
self.vocab = vocab
self.total_n_words = sum(map(len, non_empty_seqs))
bigram_ppmi_wolog = {
k: (nab - self.min_bigram_count + 1)
/ univocab[k[0]]
/ univocab[k[1]]
* self.total_n_words
for k, nab in bivocab.items()
if k[0] in univocab and k[1] in univocab
}
self.bigram_ppmi = {
k: math.log(v, 2)
for k, v in bigram_ppmi_wolog.items()
if math.log(v, 2) > 0
}
self.saved_counts.append(
{
"unigram_count": self.unigram_count,
"bigram_count": self.bigram_count,
"bigram_pmi": self.bigram_pmi,
"bigram_ppmi": self.bigram_ppmi,
"total_n_words": self.total_n_words,
"vocab": self.vocab,
}
)
print(f"#of Original Tokens: {self.total_n_words}")
print(f"#of Unigram Token Types: {len(uni2count)}")
print(
f"#of Unigram Token Types(freq >= {self.min_unigram_count}): {len(univocab)}"
)
print(f"#of Bigram Token Types: {len(bi2count)}")
print(
f"#of Bigram Token Types(freq >= {self.min_bigram_count}): {len(bivocab)}"
)
print(f"#of Bigram PPMI vocab: {len(self.bigram_ppmi)}")
def calc_pmi_scores(self, document_iter, sep="_", ix=0):
docs_with_phrases = []
for line in document_iter:
doc_with_phrases = []
pairs = ngrams(line, n=2, longest=True)
for pair in pairs:
if (
pair in self.bigram_ppmi # self.vocab
and pair[0] in self.vocab
and pair[1] in self.vocab
):
na = self.unigram_count[pair[0]]
pa = self.unigram_probability[pair[0]]
nb = self.unigram_count[pair[1]]
pb = self.unigram_probability[pair[1]]
nab = self.bigram_count[pair]
pab = self.bigram_probability[pair]
ppmi = self.bigram_ppmi[pair]
raw_pmi = self.bigram_pmi[pair]
if (
ppmi > self.pmi_lower_threshold[ix]
and ppmi < self.pmi_upper_threshold[ix]
):
# don't permit overlapping phrases
next(pairs)
phrase = sep.join(pair)
doc_with_phrases.append(phrase)
self.pmi_phrases[phrase] = PMIPhrase(
phrase,
ppmi,
raw_pmi,
pa,
pb,
pab,
na,
nb,
nab,
self.unigram_denominator,
self.bigram_denominator,
len(phrase.split(sep)),
)
else:
doc_with_phrases.append(pair[0])
docs_with_phrases.append(doc_with_phrases)
return docs_with_phrases
def prepro(s):
return re.sub("_+", "", re.sub("\d+", "", re.sub(r"(\s)+", r"\g<1>", s.lower())))
def get_ngram_tfidf_vector(corpus, n=2, max_df=1.0, min_df=2):
# corpus = list({doc for _, doc in docid2texts.items() if not re.search('\p{Han}+', doc)})
vectorizer = TfidfVectorizer(
preprocessor=prepro,
ngram_range=(n, n),
max_df=max_df,
min_df=min_df,
sublinear_tf=False,
)
c_vectorizer = CountVectorizer(
preprocessor=prepro,
ngram_range=(n, n),
max_df=max_df,
min_df=min_df,
)
X = vectorizer.fit_transform(corpus)
C = c_vectorizer.fit_transform(corpus)
df_tfidf = pd.DataFrame(X.toarray())
df_tfidf.columns = vectorizer.get_feature_names()
df_count = pd.DataFrame(C.toarray())
df_count.columns = c_vectorizer.get_feature_names()
# .get_feature_names(): [feature_name]
# .vocabulary_: feature_name -> column index
ngram_idf = dict(zip(vectorizer.get_feature_names(), vectorizer.idf_))
tfidf_sum_map = dict(df_tfidf.sum(axis=0).items())
tfidf_mean_map = dict(df_tfidf.mean(axis=0).items())
count_sum_map = dict(df_count.sum(axis=0).items())
ngrams_tfidf_scored = [
{
"word": kw,
"n_words": n,
"count": count_sum_map[kw],
"idf": idf,
"tfidf_sum": tfidf_sum_map[kw],
"tfidf_mean": tfidf_mean_map[kw],
"score": np.log(count_sum_map[kw]) * idf,
}
for kw, idf in ngram_idf.items()
]
ngrams_tfidf_scored = sorted(ngrams_tfidf_scored, key=lambda x: -x["score"])
return ngrams_tfidf_scored
@kzinmr
Copy link
Author

kzinmr commented Jun 4, 2021

from phrase_analysis import get_ngram_tfidf_vector
# sws
# all_corpus = [txt for ct in major_cts for txt in ct2texts[ct]]

bigrams_tfidf_scored = get_ngram_tfidf_vector(all_corpus, n=2, max_df=1.0, min_df=2)
print(len(bigrams_tfidf_scored))
bigrams_tfidf_scored_f = [d for d in bigrams_tfidf_scored if all(w not in sws for w in d['word'].split(' '))]
print(len(bigrams_tfidf_scored_f))
common300 = [d['word'] for d in sorted(bigrams_tfidf_scored_f, key=lambda x: -x['count'])[:300]]

def show_significants(ct):
    # [d['word'] for d in sorted(show_significants(ct), key=lambda x:-x['count'])][:10]
    corpus = ct2texts[ct]
    bigrams_tfidf_scored = get_ngram_tfidf_vector(corpus, n=2, max_df=1.0, min_df=2)
    bi_sws = set(common300)
    print(len(bigrams_tfidf_scored))
    bigrams_tfidf_scored_f = [d for d in bigrams_tfidf_scored if all(w not in sws for w in d['word'].split(' ')) and d['word'] not in bi_sws]
    print(len(bigrams_tfidf_scored_f))
    return sorted(bigrams_tfidf_scored_f, key=lambda x: -x['count'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment