Created
March 2, 2015 21:35
-
-
Save halfak/63f7e830448fd7392d22 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cProfile as profile | |
import pprint | |
import re | |
import time | |
from hashlib import sha1 | |
from mw import api | |
from more_itertools import peekable | |
class Token(str): | |
__slots__ = ("type", "i") | |
def __new__(cls, *args, **kwargs): | |
if len(args) == 1 and len(kwargs) == 0: | |
if isinstance(args[0], cls): | |
return args[0] | |
else: | |
raise TypeError("Expected {0}, got {1}".format(cls, | |
type(args[0]))) | |
else: | |
inst = super().__new__(cls, args[0]) | |
inst.initialize(*args, **kwargs) | |
return inst | |
def __init__(self, *args, **kwargs): pass | |
def __str__(self): | |
return super().__str__() | |
def __repr__(self): | |
return "{0}({1}, {2}, {3})".format(self.__class__.__name__, | |
super().__repr__(), | |
repr(self.type), | |
self.i) | |
@property | |
def start(self): | |
return self.i | |
@property | |
def end(self): | |
return self.i | |
def initialize(self, content, type, i): | |
self.type = str(type) | |
self.i = int(i) | |
LEXICON = [ | |
('comment_start', r'<!--'), | |
('comment_end', r'-->'), | |
('entity', r'&[a-z][a-z0-9]*;'), | |
('word', r'[^[\W\d]+'), | |
('number', r'[\d]+'), | |
('tag', r'<\\?([a-z][a-z0-9]*)\b[^>]*>'), | |
('period', r'\.+'), | |
('qmark', r'\?+'), | |
('epoint', r'!+'), | |
('comma', r',+'), | |
('colon', r':+'), | |
('scolon', r';+'), | |
('break', r'(\n|\n\r|\r\n)\s*(\n|\n\r|\r\n)+'), | |
('whitespace', r'[\n\r\s]+'), | |
('dbrack_open', r'\[\['), | |
('dbrack_close', r'\]\]'), | |
('brack_open', r'\['), | |
('brack_close', r'\]'), | |
('tab_open', r'\{\|'), | |
('tab_close', r'\|\}'), | |
('dcurly_open', r'\{\{'), | |
('dcurly_close', r'\}\}'), | |
('curly_open', r'\{'), | |
('curly_close', r'\}'), | |
("bold", r"'''"), | |
("italic", r"''"), | |
("equals", r"=+"), | |
("etc", r".") | |
] | |
def tokenize(text, lexicon=LEXICON): | |
regex = '|'.join('(?P<{0}>{1})'.format(name, pattern) | |
for name, pattern in lexicon) | |
for i, match in enumerate(re.finditer(regex, text)): | |
type = match.lastgroup | |
value = match.group(0) | |
yield Token(value, type=type, i=i) | |
text = "This is some == sample text == {{foobar}}, ''hats''.\n\t\n" | |
pprint.pprint([(t.type, str(t), t.i) for t in tokenize(text)]) | |
class Segment(list): | |
def __new__(cls, *args, **kwargs): | |
if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], cls): | |
args[0] | |
else: | |
inst = super().__new__(cls, *args, **kwargs) | |
inst.initialize(*args, **kwargs) | |
return inst | |
def __init__(self, *args, **kwargs): pass | |
def initialize(self, subsegments=None): | |
subsegments = subsegments or [] | |
super().__init__(subsegments) | |
@property | |
def start(self): | |
return self[0].start | |
@property | |
def end(self): | |
return self[-1].end | |
def __repr__(self): | |
return "{0}({1})".format(self.__class__.__name__, super().__repr__()) | |
def __str__(self): | |
return ''.join(str(ss) for ss in self) | |
def __eq__(self, other): | |
raise NotImplementedError() | |
def __neq__(self, other): | |
raise NotImplementedError() | |
def __hash__(self, other): | |
raise NotImplementedError() | |
class MatchableSegment(Segment): | |
__slots__ = ("sha1") | |
def initialize(self, subsegments=None): | |
super().initialize(subsegments) | |
self.sha1 = sha1(bytes(str(subsegments), 'utf-8')) | |
def __eq__(self, other): | |
try: | |
return self.sha1.digest() == other.sha1.digest() | |
except AttributeError: | |
return False | |
def __neq__(self, other): | |
try: | |
return self.sha1.digest() == other.sha1.digest() | |
except AttributeError: | |
return False | |
def __hash__(self): | |
return self.sha1.digest() | |
def append(self, subsegment): | |
super().append(subsegment) | |
self.sha1.update(bytes(str(subsegment), 'utf-8')) | |
def extend(self, subsegments): | |
for subsegment in subsegments: | |
self.append(subsegment) | |
print(repr(Segment([Token("foo", "word", 1), Token("bar", "word", 2)]))) | |
class Segmenter: | |
def __init__(self): pass | |
def segment(self, text): raise NotImplementedError() | |
""" | |
Provides a segmenter for splitting text tokens into | |
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Paragraph`, | |
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Sentence`, and | |
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Whitespace`. | |
.. autoclass:: deltas.segmenters.ParagraphsSentencesAndWhitespace | |
:members: | |
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Paragraph | |
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Sentence | |
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Whitespace | |
""" | |
class LookAhead: | |
class DONE: pass | |
def __new__(cls, it): | |
if isinstance(it, cls): | |
return it | |
elif hasattr(it, "__next__") or hasattr(it, "__iter__"): | |
return cls.from_iterable(it) | |
else: | |
raise TypeError("Expected iterable, got {0}", type(it)) | |
@classmethod | |
def from_iterable(cls, iterable): | |
instance = super().__new__(cls) | |
instance.initialize(iterable) | |
return instance | |
def __init__(self, *args, **kwargs): pass | |
def initialize(self, iterable): | |
self.iterable = iter(iterable) | |
self.i = -1 # Will increment to zero in a moment | |
self._load_next() | |
def _load_next(self): | |
try: | |
self.next = next(self.iterable) | |
self.i += 1 | |
except StopIteration: | |
self.next = self.DONE | |
def __iter__(self): return self | |
def __next__(self): | |
if self.empty(): | |
raise StopIteration() | |
else: | |
current = self.next | |
self._load_next() | |
return current | |
def pop(self): | |
return self.__next__() | |
def peek(self): | |
return self.next | |
def empty(self): | |
return self.next == self.DONE | |
WHITESPACE = set(["whitespace", "break"]) | |
PARAGRAPH_END = set(["break"]) | |
SENTENCE_END = set(["period", "epoint", "qmark"]) | |
MIN_SENTENCE = 5 | |
class ParagraphsSentencesAndWhitespace(Segmenter): | |
def __init__(self, *, whitespace=None, | |
paragraph_end=None, | |
sentence_end=None, | |
min_sentence=None): | |
self.whitespace = set(whitespace or WHITESPACE) | |
self.paragraph_end = set(paragraph_end or PARAGRAPH_END) | |
self.sentence_end = set(sentence_end or SENTENCE_END) | |
self.min_sentence = int(min_sentence or MIN_SENTENCE) | |
def segment(self, tokens): | |
""" | |
Clusters a sequence of tokens into a list of segments. | |
:Parameters: | |
tokens : `iterable` of `str` | |
A series of tokens to segment. | |
:Returns: | |
A `list` of :class:`Segment` | |
""" | |
look_ahead = LookAhead(tokens) | |
while not look_ahead.empty(): | |
if look_ahead.peek().type in self.whitespace: | |
segment = self._read_whitespace(look_ahead) | |
else: | |
segment = self._read_paragraph(look_ahead) | |
yield segment | |
def _read_whitespace(self, look_ahead): | |
whitespace = Segment([look_ahead.pop()]) | |
while not look_ahead.empty() and \ | |
look_ahead.peek().type in self.whitespace: | |
whitespace.append(look_ahead.pop()) | |
return whitespace | |
def _read_sentence(self, look_ahead): | |
sentence = MatchableSegment([look_ahead.pop()]) | |
while not look_ahead.empty() and \ | |
look_ahead.peek() not in self.paragraph_end: | |
i, sentence_bit = look_ahead.i, look_ahead.pop() | |
sentence.append(sentence_bit) | |
if sentence_bit.type in self.sentence_end and \ | |
len(sentence) >= self.min_sentence: | |
break | |
return sentence | |
def _read_paragraph(self, look_ahead): | |
paragraph = MatchableSegment() | |
while not look_ahead.empty() and \ | |
look_ahead.peek().type not in self.whitespace: | |
if look_ahead.peek().type in self.whitespace: | |
segment = self._read_whitespace(look_ahead) | |
else: | |
segment = self._read_sentence(look_ahead) | |
paragraph.append(segment) | |
return paragraph | |
@classmethod | |
def from_config(cls, doc, name): | |
return cls( | |
whitespace=doc['segmenters'][name].get('whitespace'), | |
paragraph_split=doc['segmenters'][name].get('paragraph_split'), | |
sentence_end=doc['segmenters'][name].get('sentence_end'), | |
min_sentence=doc['segmenters'][name].get('min_sentence') | |
) | |
text = """ | |
This is a paragraph and some text. | |
== Header 1 == | |
Here is some other text.{{cite}}""" | |
segmenter = ParagraphsSentencesAndWhitespace() | |
for segment in segmenter.segment(tokenize(text)): | |
print(segment.__class__.__name__, repr(str(segment))) | |
session = api.Session("https://en.wikipedia.org/w/api.php") | |
common1 = session.revisions.get(649586755, properties={"content"})['*'] | |
common1_tokens = list(tokenize(common1)) | |
common2 = session.revisions.get(649591281, properties={"content"})['*'] | |
common2_tokens = list(tokenize(common2)) | |
print("Segmenting:") | |
def segment_common(): | |
start = time.time() | |
for _ in range(50): | |
segments = list(segmenter.segment(common1_tokens)) | |
print("\tcommon: {0}".format((time.time() - start)/50)) | |
segment_common() | |
#profile.run('segment_common()', sort="cumulative") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment