Skip to content

Instantly share code, notes, and snippets.

@halfak
Created March 2, 2015 21:35
Show Gist options
  • Save halfak/63f7e830448fd7392d22 to your computer and use it in GitHub Desktop.
Save halfak/63f7e830448fd7392d22 to your computer and use it in GitHub Desktop.
import cProfile as profile
import pprint
import re
import time
from hashlib import sha1
from mw import api
from more_itertools import peekable
class Token(str):
__slots__ = ("type", "i")
def __new__(cls, *args, **kwargs):
if len(args) == 1 and len(kwargs) == 0:
if isinstance(args[0], cls):
return args[0]
else:
raise TypeError("Expected {0}, got {1}".format(cls,
type(args[0])))
else:
inst = super().__new__(cls, args[0])
inst.initialize(*args, **kwargs)
return inst
def __init__(self, *args, **kwargs): pass
def __str__(self):
return super().__str__()
def __repr__(self):
return "{0}({1}, {2}, {3})".format(self.__class__.__name__,
super().__repr__(),
repr(self.type),
self.i)
@property
def start(self):
return self.i
@property
def end(self):
return self.i
def initialize(self, content, type, i):
self.type = str(type)
self.i = int(i)
LEXICON = [
('comment_start', r'<!--'),
('comment_end', r'-->'),
('entity', r'&[a-z][a-z0-9]*;'),
('word', r'[^[\W\d]+'),
('number', r'[\d]+'),
('tag', r'<\\?([a-z][a-z0-9]*)\b[^>]*>'),
('period', r'\.+'),
('qmark', r'\?+'),
('epoint', r'!+'),
('comma', r',+'),
('colon', r':+'),
('scolon', r';+'),
('break', r'(\n|\n\r|\r\n)\s*(\n|\n\r|\r\n)+'),
('whitespace', r'[\n\r\s]+'),
('dbrack_open', r'\[\['),
('dbrack_close', r'\]\]'),
('brack_open', r'\['),
('brack_close', r'\]'),
('tab_open', r'\{\|'),
('tab_close', r'\|\}'),
('dcurly_open', r'\{\{'),
('dcurly_close', r'\}\}'),
('curly_open', r'\{'),
('curly_close', r'\}'),
("bold", r"'''"),
("italic", r"''"),
("equals", r"=+"),
("etc", r".")
]
def tokenize(text, lexicon=LEXICON):
regex = '|'.join('(?P<{0}>{1})'.format(name, pattern)
for name, pattern in lexicon)
for i, match in enumerate(re.finditer(regex, text)):
type = match.lastgroup
value = match.group(0)
yield Token(value, type=type, i=i)
text = "This is some == sample text == {{foobar}}, ''hats''.\n\t\n"
pprint.pprint([(t.type, str(t), t.i) for t in tokenize(text)])
class Segment(list):
def __new__(cls, *args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], cls):
args[0]
else:
inst = super().__new__(cls, *args, **kwargs)
inst.initialize(*args, **kwargs)
return inst
def __init__(self, *args, **kwargs): pass
def initialize(self, subsegments=None):
subsegments = subsegments or []
super().__init__(subsegments)
@property
def start(self):
return self[0].start
@property
def end(self):
return self[-1].end
def __repr__(self):
return "{0}({1})".format(self.__class__.__name__, super().__repr__())
def __str__(self):
return ''.join(str(ss) for ss in self)
def __eq__(self, other):
raise NotImplementedError()
def __neq__(self, other):
raise NotImplementedError()
def __hash__(self, other):
raise NotImplementedError()
class MatchableSegment(Segment):
__slots__ = ("sha1")
def initialize(self, subsegments=None):
super().initialize(subsegments)
self.sha1 = sha1(bytes(str(subsegments), 'utf-8'))
def __eq__(self, other):
try:
return self.sha1.digest() == other.sha1.digest()
except AttributeError:
return False
def __neq__(self, other):
try:
return self.sha1.digest() == other.sha1.digest()
except AttributeError:
return False
def __hash__(self):
return self.sha1.digest()
def append(self, subsegment):
super().append(subsegment)
self.sha1.update(bytes(str(subsegment), 'utf-8'))
def extend(self, subsegments):
for subsegment in subsegments:
self.append(subsegment)
print(repr(Segment([Token("foo", "word", 1), Token("bar", "word", 2)])))
class Segmenter:
def __init__(self): pass
def segment(self, text): raise NotImplementedError()
"""
Provides a segmenter for splitting text tokens into
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Paragraph`,
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Sentence`, and
:class:`~deltas.segmenters.paragraphs_sentences_and_whitespace.Whitespace`.
.. autoclass:: deltas.segmenters.ParagraphsSentencesAndWhitespace
:members:
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Paragraph
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Sentence
.. autoclass:: deltas.segmenters.paragraphs_sentences_and_whitespace.Whitespace
"""
class LookAhead:
class DONE: pass
def __new__(cls, it):
if isinstance(it, cls):
return it
elif hasattr(it, "__next__") or hasattr(it, "__iter__"):
return cls.from_iterable(it)
else:
raise TypeError("Expected iterable, got {0}", type(it))
@classmethod
def from_iterable(cls, iterable):
instance = super().__new__(cls)
instance.initialize(iterable)
return instance
def __init__(self, *args, **kwargs): pass
def initialize(self, iterable):
self.iterable = iter(iterable)
self.i = -1 # Will increment to zero in a moment
self._load_next()
def _load_next(self):
try:
self.next = next(self.iterable)
self.i += 1
except StopIteration:
self.next = self.DONE
def __iter__(self): return self
def __next__(self):
if self.empty():
raise StopIteration()
else:
current = self.next
self._load_next()
return current
def pop(self):
return self.__next__()
def peek(self):
return self.next
def empty(self):
return self.next == self.DONE
WHITESPACE = set(["whitespace", "break"])
PARAGRAPH_END = set(["break"])
SENTENCE_END = set(["period", "epoint", "qmark"])
MIN_SENTENCE = 5
class ParagraphsSentencesAndWhitespace(Segmenter):
def __init__(self, *, whitespace=None,
paragraph_end=None,
sentence_end=None,
min_sentence=None):
self.whitespace = set(whitespace or WHITESPACE)
self.paragraph_end = set(paragraph_end or PARAGRAPH_END)
self.sentence_end = set(sentence_end or SENTENCE_END)
self.min_sentence = int(min_sentence or MIN_SENTENCE)
def segment(self, tokens):
"""
Clusters a sequence of tokens into a list of segments.
:Parameters:
tokens : `iterable` of `str`
A series of tokens to segment.
:Returns:
A `list` of :class:`Segment`
"""
look_ahead = LookAhead(tokens)
while not look_ahead.empty():
if look_ahead.peek().type in self.whitespace:
segment = self._read_whitespace(look_ahead)
else:
segment = self._read_paragraph(look_ahead)
yield segment
def _read_whitespace(self, look_ahead):
whitespace = Segment([look_ahead.pop()])
while not look_ahead.empty() and \
look_ahead.peek().type in self.whitespace:
whitespace.append(look_ahead.pop())
return whitespace
def _read_sentence(self, look_ahead):
sentence = MatchableSegment([look_ahead.pop()])
while not look_ahead.empty() and \
look_ahead.peek() not in self.paragraph_end:
i, sentence_bit = look_ahead.i, look_ahead.pop()
sentence.append(sentence_bit)
if sentence_bit.type in self.sentence_end and \
len(sentence) >= self.min_sentence:
break
return sentence
def _read_paragraph(self, look_ahead):
paragraph = MatchableSegment()
while not look_ahead.empty() and \
look_ahead.peek().type not in self.whitespace:
if look_ahead.peek().type in self.whitespace:
segment = self._read_whitespace(look_ahead)
else:
segment = self._read_sentence(look_ahead)
paragraph.append(segment)
return paragraph
@classmethod
def from_config(cls, doc, name):
return cls(
whitespace=doc['segmenters'][name].get('whitespace'),
paragraph_split=doc['segmenters'][name].get('paragraph_split'),
sentence_end=doc['segmenters'][name].get('sentence_end'),
min_sentence=doc['segmenters'][name].get('min_sentence')
)
text = """
This is a paragraph and some text.
== Header 1 ==
Here is some other text.{{cite}}"""
segmenter = ParagraphsSentencesAndWhitespace()
for segment in segmenter.segment(tokenize(text)):
print(segment.__class__.__name__, repr(str(segment)))
session = api.Session("https://en.wikipedia.org/w/api.php")
common1 = session.revisions.get(649586755, properties={"content"})['*']
common1_tokens = list(tokenize(common1))
common2 = session.revisions.get(649591281, properties={"content"})['*']
common2_tokens = list(tokenize(common2))
print("Segmenting:")
def segment_common():
start = time.time()
for _ in range(50):
segments = list(segmenter.segment(common1_tokens))
print("\tcommon: {0}".format((time.time() - start)/50))
segment_common()
#profile.run('segment_common()', sort="cumulative")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment