Created
April 4, 2017 13:58
-
-
Save alexgarel/d2432754a4855bf744e2b262059f8c3a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from six import string_types | |
from gensim.models import phrases | |
from gensim import utils | |
class Phrases(phrases.Phrases): | |
def __init__(self, *args, **kwargs): | |
assert "stop_words" in kwargs, "stop_words is mandatory" | |
self.stop_words = frozenset(utils.any2utf8(w) for w in kwargs.pop("stop_words")) | |
super().__init__(*args, **kwargs) | |
def lrsentence(self, sentence, delimiter, stop_words): | |
# first part does not us stop words | |
lsentence = [w for w in sentence if w not in stop_words] | |
# second sentence join stop words and following word | |
rsentence = [] | |
stop_word = [] | |
for w in sentence: | |
if w in self.stop_words: | |
stop_word.append(w) | |
else: | |
w = delimiter.join(stop_word + [w]) | |
rsentence.append(w) | |
stop_word = [] | |
return lsentence, rsentence | |
# no more static, we wan't to access stop words | |
def learn_vocab(self, sentences, max_vocab_size, delimiter=b'_', progress_per=10000): | |
"""Collect unigram/bigram counts from the `sentences` iterable.""" | |
stop_words = self.stop_words | |
sentence_no = -1 | |
total_words = 0 | |
#logger.info("collecting all words and their counts") | |
vocab = defaultdict(int) | |
min_reduce = 1 | |
for sentence_no, sentence in enumerate(sentences): | |
if sentence_no % progress_per == 0: | |
pass #logger.info("PROGRESS: at sentence #%i, processed %i words and %i word types" % | |
# (sentence_no, total_words, len(vocab))) | |
sentence = [utils.any2utf8(w) for w in sentence] | |
lsentence, rsentence = self.lrsentence(sentence, delimiter, stop_words) | |
for bigram in zip(lsentence, rsentence[1:]): | |
vocab[bigram[0]] += 1 | |
vocab[delimiter.join(bigram)] += 1 | |
total_words += 1 | |
if lsentence: # add last word skipped by previous loop | |
word = lsentence[-1] | |
vocab[word] += 1 | |
if len(vocab) > max_vocab_size: | |
utils.prune_vocab(vocab, min_reduce) | |
min_reduce += 1 | |
#logger.info("collected %i word types from a corpus of %i words (unigram + bigrams) and %i sentences" % | |
# (len(vocab), total_words, sentence_no + 1)) | |
return min_reduce, vocab | |
def __getitem__(self, sentence): | |
stop_words = self.stop_words | |
try: | |
is_single = not sentence or isinstance(sentence[0], string_types) | |
except: | |
is_single = False | |
if not is_single: | |
# if the input is an entire corpus (rather than a single sentence), | |
# return an iterable stream. | |
return self._apply(sentence) | |
s, new_s = [utils.any2utf8(w) for w in sentence], [] | |
last_bigram = False | |
vocab = self.vocab | |
threshold = self.threshold | |
delimiter = self.delimiter | |
odelimiter = ord(delimiter) | |
min_count = self.min_count | |
lsentence, rsentence = self.lrsentence(s, delimiter, stop_words) | |
for w in s: # initial stop words | |
if w not in stop_words: | |
break | |
new_s.append(w) | |
for word_a, word_b, orig_b in zip(lsentence, rsentence[1:], lsentence[1:]): | |
if word_a in vocab and orig_b in vocab: | |
bigram_word = delimiter.join((word_a, word_b)) | |
if bigram_word in vocab and not last_bigram: | |
pa = float(vocab[word_a]) | |
pb = float(vocab[orig_b]) | |
pab = float(vocab[bigram_word]) | |
score = (pab - min_count) / pa / pb * len(vocab) | |
# logger.debug("score for %s: (pab=%s - min_count=%s) / pa=%s / pb=%s * vocab_size=%s = %s", | |
# bigram_word, pab, self.min_count, pa, pb, len(self.vocab), score) | |
if score > threshold: | |
new_s.append(bigram_word) | |
last_bigram = True | |
continue | |
if not last_bigram: | |
new_s.append(word_a) | |
# stop words | |
last_stop_index = 0 | |
for i, w in enumerate(word_b): | |
if w == odelimiter: | |
new_s.append(word_b[last_stop_index:i]) | |
last_stop_index = i + 1 | |
last_bigram = False | |
if s: # add last word skipped by previous loop | |
last_token = s[-1] | |
if not last_bigram: | |
new_s.append(last_token) | |
return [utils.to_unicode(w) for w in new_s] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment