Created
January 11, 2016 21:29
-
-
Save jbaiter/964e0b65b6e6e6c78758 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.4 | |
import argparse | |
import math | |
import sys | |
from collections import defaultdict, namedtuple | |
from itertools import chain | |
from pcfg import read_pcfg | |
# Data type for constituents | |
# Implemented as a named tuple (=immutable!) so we can use it as a key | |
# in dictionaries | |
Constituent = namedtuple("Constituent", ["start", "end", "rule", "complete"]) | |
def parse(lexicon, grammar, sentence): | |
""" Parse the tokenized sentence into a syntax tree. | |
:param lexicon: PCFG lexicon with word -> {tag -> probabity} mappings | |
:type lexicon: dict | |
:param grammar: PCFG grammar with head -> {tail -> probability} | |
mappings | |
:type grammar: dict | |
:return: The syntax tree for the sentence | |
:rtype: S-Expressions via nested lists | |
""" | |
# NOTE: All the state is kept in the lexical scope of this outer function, | |
# while the actual left-corner algorithm is implemented in the inner | |
# closures that have acces to this state. | |
chart = [] | |
completed_endpos = defaultdict(list) | |
unknown_toks = [t for t in sentence if t not in lexicon] | |
if unknown_toks: | |
raise ValueError("Can't parse sentence because the following words " | |
"are unknown: {}".format(unknown_toks)) | |
def prune_chart(): | |
""" Remove non-complete constituents from the chart and the mapping | |
of completion end positions. """ | |
nonlocal chart | |
nonlocal completed_endpos | |
pruned_chart = [] | |
for ch in chart: | |
# Only keep completed constituents | |
pruned = {const: prob for const, prob in ch.items() | |
if const.complete == len(const.rule[1:])} | |
pruned_chart.append(pruned) | |
chart = pruned_chart | |
all_consts = set(chain(*chart)) | |
completed_endpos = { | |
c: endpos for c, endpos in completed_endpos.items() | |
if c in all_consts} | |
def parse_subtree(start, end, sym=None): | |
""" Get the best parse for a given range of the sentence. | |
To obtain the best parse for the whole sentence, set start=0, | |
end=len(sentence) and sym=None. | |
This will determine the best starting symbol automatically. | |
""" | |
candidates = ((c, prob) for c, prob in chart[end].items() | |
if ((sym is None or c.rule[0] == sym) and | |
c.start == start and | |
c.end == end)) | |
const = max(candidates, key=lambda x: x[1])[0] | |
if const.rule[1] in lexicon: | |
return const.rule | |
else: | |
end_positions = completed_endpos[const] | |
parse = [const.rule[0]] | |
cur_start = start | |
for child, end_pos in zip(const.rule[1:], end_positions): | |
child_parse = parse_subtree(cur_start, end_pos, child) | |
parse.append(child_parse) | |
cur_start = end_pos | |
return parse | |
def scan(word, position): | |
""" Look up a word in the lexicon, add matching nodes to chart. """ | |
for tag, prob in lexicon[word].items(): | |
const = Constituent(position, position+1, (tag, word), 1) | |
add(const, math.log(prob)) | |
def predict(const): | |
""" Called when `const` is fully parsed, adds all rules from | |
grammar to chart where the first ('left-corner') constituent | |
of the rule is identical with `const`. | |
""" | |
for head, child_probs in grammar.items(): | |
for tail, prob in child_probs.items(): | |
if tail[0] == const.rule[0]: | |
child_rule = (head,) + tail | |
child_const = Constituent(const.start, const.start, | |
child_rule, 0) | |
add(child_const, math.log(prob)) | |
def complete(const, const_prob): | |
""" Called when `const` is fully parsed, complete all partial | |
constituents from the chart that expect `const` as the next | |
constituent. | |
""" | |
# Since we can't mutate the chart during iteration, we remember which | |
# new constituents result from completion, so we can add them after | |
# we're done iterating. | |
completed = [] | |
for other_const, prob in chart[const.start].items(): | |
if other_const.complete == len(other_const.rule[1:]): | |
# Skip already completed constituents | |
continue | |
if other_const.rule[other_const.complete+1] != const.rule[0]: | |
# Skip constituents that don't expect our constituent | |
# as the next to be completed | |
continue | |
const_len = const.end - const.start | |
# Build the new, (partially) completed constituent | |
new_const = Constituent( | |
other_const.start, other_const.end + const_len, | |
other_const.rule, other_const.complete + 1) | |
# Probability for the new constituent is the product (sum, since | |
# we're using log-probabilities) of the completing and the | |
# completed constituent's probabilities. | |
completed.append((new_const, const_prob + prob)) | |
completed_endpos[new_const] = ( | |
completed_endpos[other_const] + [const.end]) | |
for compl, prob in completed: | |
add(compl, prob) | |
def add(const, prob): | |
""" Add `const` to chart. If an equivalent constituent is present | |
in the chart, pick the one with the higher probability. | |
""" | |
cur_prob = chart[const.end].get(const, None) | |
if cur_prob is None or cur_prob < prob: | |
chart[const.end][const] = prob | |
# Was the constituent completely recognized? | |
if const.complete == len(const.rule[1:]): | |
predict(const) | |
complete(const, prob) | |
chart.append(dict()) | |
for idx, tok in enumerate(sentence): | |
chart.append(dict()) | |
scan(tok, idx) | |
prune_chart() | |
return parse_subtree(0, len(sentence)) | |
class LeftCornerParser: | |
""" Parser using the Left-Corner algorithm. """ | |
def __init__(self, lexicon_path, grammar_path): | |
""" Build a new parser from the given PCFG. | |
:param lexicon_path: Path to PCFG lexicon | |
:param grammar_path: Path to PCFG grammar | |
""" | |
self._grammar = read_pcfg(grammar_path) | |
self._lexicon = self._read_lexicon(lexicon_path) | |
def _read_lexicon(self, path): | |
""" Read the lexicon from the given PCFG file. | |
:param path: Path to PCFG lexicon | |
:returns: Lexicon as a {word: {tag: probability}} mapping | |
:rtype: {str: {str: float}} dict | |
""" | |
lex_pcfg = read_pcfg(path) | |
lex = {} | |
for tag, word_probs in lex_pcfg.items(): | |
for word, prob in word_probs.items(): | |
word = word[0] | |
if word in lex: | |
lex[word][tag] = prob | |
else: | |
lex[word] = {tag: prob} | |
return lex | |
def parse(self, sentence): | |
""" Parse the given (tokenized!) sentence. | |
:param sentence: The sentence to parse | |
:type sentence: sequence of str | |
:returns: The parse for the sentence | |
:rtype: Nested lists of str | |
""" | |
return parse(self._lexicon, self._grammar, sentence) | |
def print_tree(tree, indent=0): | |
""" Print the given tree to standard output. | |
:param tree: Tree | |
:type tree: nested lists of str | |
""" | |
head, *tail = tree | |
if len(tree) == 2 and not isinstance(tail[0], (tuple, list)): | |
print(indent*" ", head, tail[0]) | |
return | |
else: | |
print(indent*" ", head) | |
for t in tail: | |
print_tree(t, indent+2) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-g", "--grammar", type=str, required=True, | |
help="Path to PCFG grammar.") | |
parser.add_argument("-l", "--lexicon", type=str, required=True, | |
help="Path to PCFG lexicon.") | |
parser.add_argument("sentence", type=str, | |
help="Tokenized sentence to parse.") | |
args = parser.parse_args() | |
parser = LeftCornerParser(args.lexicon, args.grammar) | |
print("Parsing sentence, this may take a while...", file=sys.stderr) | |
sent_parse = parser.parse(args.sentence.split()) | |
print_tree(sent_parse) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.4 | |
import re | |
import sys | |
from collections import defaultdict, deque, Counter | |
def build_parsetree(token_stream): | |
""" Build a lisp-style tree from the token stream. """ | |
tree = [] | |
while token_stream: | |
token = token_stream.popleft() | |
if token == "(": | |
tree.append(build_parsetree(token_stream)) | |
elif token == ")": | |
return tree | |
else: | |
token = re.sub(r"^([A-Z-]+)[-=]\d+$", r"\1", token) | |
tree.append(token) | |
return tree | |
def parse_treebank(path): | |
""" Given a file with parse trees, tokenize it and parse it into a Python | |
data structure. | |
""" | |
with open(path) as fp: | |
data = fp.read() | |
# Normalize whitespace | |
data = re.sub(r"\s+", " ", data) | |
# Very simple regex lexer | |
data = re.sub("\(\(", "( (", data) | |
data = re.sub("\)\)", ") )", data) | |
data = re.sub("\((\S)", "( \\1", data) | |
data = re.sub("(\S)\)", "\\1 )", data) | |
return build_parsetree(deque(data.split())) | |
def count_contexts(tree): | |
""" Given a tree, count the grammatical and lexical contexts. """ | |
grammar_counts = Counter() | |
lexicon_counts = Counter() | |
if len(tree) == 2 and not isinstance(tree[1], list): | |
lexicon_counts[tuple(tree)] += 1 | |
return grammar_counts, lexicon_counts | |
if not isinstance(tree[0], list): | |
gram_pat = tuple([tree[0]] + [t[0] for t in tree[1:]]) | |
grammar_counts[gram_pat] += 1 | |
tree = tree[1:] | |
for node in tree: | |
new_gram, new_lex = count_contexts(node) | |
grammar_counts.update(new_gram) | |
lexicon_counts.update(new_lex) | |
return grammar_counts, lexicon_counts | |
def create_pcfg(treebank_paths): | |
""" Create a PCFG grammar and lexicon from a set of files containing | |
parsed sentences. """ | |
# FIXME: The variable names could be clearer... | |
grammar_counts = Counter() | |
lexicon_counts = Counter() | |
for path in treebank_paths: | |
tree = parse_treebank(path) | |
gram_cnt, lex_cnt = count_contexts(tree) | |
grammar_counts.update(gram_cnt) | |
lexicon_counts.update(lex_cnt) | |
# Determine child probabilities for each root | |
gram_counts = defaultdict(dict) | |
for pat, cnt in grammar_counts.items(): | |
root = pat[0] | |
children = pat[1:] | |
gram_counts[root][children] = cnt | |
gram_probs = defaultdict(dict) | |
for root, child_cnts in gram_counts.items(): | |
all_cnt = sum(child_cnts.values()) | |
for child, cnt in child_cnts.items(): | |
gram_probs[root][child] = cnt / all_cnt | |
# Determine word probabilities for each tag | |
lex_counts = defaultdict(dict) | |
for (tag, word), cnt in lexicon_counts.items(): | |
lex_counts[tag][word] = cnt | |
lex_probs = defaultdict(dict) | |
for tag, word_cnts in lex_counts.items(): | |
all_cnt = sum(word_cnts.values()) | |
for word, cnt in word_cnts.items(): | |
lex_probs[tag][word] = cnt/all_cnt | |
return gram_probs, lex_probs | |
def write_pcfg(pcfg, path): | |
""" Write a given PCFG to a file. """ | |
with open(path, 'w') as fp: | |
for root, children_probs in pcfg.items(): | |
for children, children_prob in children_probs.items(): | |
if isinstance(children, (list, tuple)): | |
children = " ".join(children) | |
fp.write("{prob} {root} {children}\n".format( | |
prob=children_prob, root=root, children=children)) | |
def read_pcfg(path): | |
""" Read a PCFG from a file. """ | |
pcfg = dict() | |
with open(path) as fp: | |
for line in fp: | |
prob, head, *children = line.split() | |
children = tuple(children) | |
prob = float(prob) | |
if head in pcfg: | |
pcfg[head][children] = prob | |
else: | |
pcfg[head] = {children: prob} | |
return pcfg | |
if __name__ == '__main__': | |
gram_probs, lex_probs = create_pcfg(sys.argv[1:]) | |
write_pcfg(gram_probs, "grammar.pcfg") | |
write_pcfg(lex_probs, "lexicon.pcfg") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment