Skip to content

Instantly share code, notes, and snippets.

@alexgarel
Created April 4, 2017 13:58
Show Gist options
  • Save alexgarel/d2432754a4855bf744e2b262059f8c3a to your computer and use it in GitHub Desktop.
Save alexgarel/d2432754a4855bf744e2b262059f8c3a to your computer and use it in GitHub Desktop.
from collections import defaultdict
from six import string_types
from gensim.models import phrases
from gensim import utils
class Phrases(phrases.Phrases):
def __init__(self, *args, **kwargs):
assert "stop_words" in kwargs, "stop_words is mandatory"
self.stop_words = frozenset(utils.any2utf8(w) for w in kwargs.pop("stop_words"))
super().__init__(*args, **kwargs)
def lrsentence(self, sentence, delimiter, stop_words):
# first part does not us stop words
lsentence = [w for w in sentence if w not in stop_words]
# second sentence join stop words and following word
rsentence = []
stop_word = []
for w in sentence:
if w in self.stop_words:
stop_word.append(w)
else:
w = delimiter.join(stop_word + [w])
rsentence.append(w)
stop_word = []
return lsentence, rsentence
# no more static, we wan't to access stop words
def learn_vocab(self, sentences, max_vocab_size, delimiter=b'_', progress_per=10000):
"""Collect unigram/bigram counts from the `sentences` iterable."""
stop_words = self.stop_words
sentence_no = -1
total_words = 0
#logger.info("collecting all words and their counts")
vocab = defaultdict(int)
min_reduce = 1
for sentence_no, sentence in enumerate(sentences):
if sentence_no % progress_per == 0:
pass #logger.info("PROGRESS: at sentence #%i, processed %i words and %i word types" %
# (sentence_no, total_words, len(vocab)))
sentence = [utils.any2utf8(w) for w in sentence]
lsentence, rsentence = self.lrsentence(sentence, delimiter, stop_words)
for bigram in zip(lsentence, rsentence[1:]):
vocab[bigram[0]] += 1
vocab[delimiter.join(bigram)] += 1
total_words += 1
if lsentence: # add last word skipped by previous loop
word = lsentence[-1]
vocab[word] += 1
if len(vocab) > max_vocab_size:
utils.prune_vocab(vocab, min_reduce)
min_reduce += 1
#logger.info("collected %i word types from a corpus of %i words (unigram + bigrams) and %i sentences" %
# (len(vocab), total_words, sentence_no + 1))
return min_reduce, vocab
def __getitem__(self, sentence):
stop_words = self.stop_words
try:
is_single = not sentence or isinstance(sentence[0], string_types)
except:
is_single = False
if not is_single:
# if the input is an entire corpus (rather than a single sentence),
# return an iterable stream.
return self._apply(sentence)
s, new_s = [utils.any2utf8(w) for w in sentence], []
last_bigram = False
vocab = self.vocab
threshold = self.threshold
delimiter = self.delimiter
odelimiter = ord(delimiter)
min_count = self.min_count
lsentence, rsentence = self.lrsentence(s, delimiter, stop_words)
for w in s: # initial stop words
if w not in stop_words:
break
new_s.append(w)
for word_a, word_b, orig_b in zip(lsentence, rsentence[1:], lsentence[1:]):
if word_a in vocab and orig_b in vocab:
bigram_word = delimiter.join((word_a, word_b))
if bigram_word in vocab and not last_bigram:
pa = float(vocab[word_a])
pb = float(vocab[orig_b])
pab = float(vocab[bigram_word])
score = (pab - min_count) / pa / pb * len(vocab)
# logger.debug("score for %s: (pab=%s - min_count=%s) / pa=%s / pb=%s * vocab_size=%s = %s",
# bigram_word, pab, self.min_count, pa, pb, len(self.vocab), score)
if score > threshold:
new_s.append(bigram_word)
last_bigram = True
continue
if not last_bigram:
new_s.append(word_a)
# stop words
last_stop_index = 0
for i, w in enumerate(word_b):
if w == odelimiter:
new_s.append(word_b[last_stop_index:i])
last_stop_index = i + 1
last_bigram = False
if s: # add last word skipped by previous loop
last_token = s[-1]
if not last_bigram:
new_s.append(last_token)
return [utils.to_unicode(w) for w in new_s]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment