Created
January 18, 2016 23:12
-
-
Save jbaiter/78cf7682f0a51d9f0809 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.4 | |
import argparse | |
import gzip | |
import json | |
import operator | |
import random | |
import sys | |
from collections import defaultdict | |
from itertools import chain | |
import util | |
def capitalization(word): | |
""" Obtain capitalization information, i.e. 'g' is word is capitalized, | |
'k' if it's not. | |
""" | |
if word[0].isupper(): | |
return 'g' | |
elif word.islower(): | |
return 'k' | |
else: | |
return '0' | |
def make_suffix(word, k): | |
""" Get the suffix of length k of the word, padding with spaces at the | |
front if len(word) < k | |
:param word: Word to get suffix from | |
:type word: str | |
:param k: Length of suffix | |
:type k: int | |
:returns: Suffix of length k | |
:rtype: str | |
""" | |
return (" "*k + word)[-k:] | |
def load_data(path): | |
""" Load dataset from tab-separated file structured into sentences. """ | |
if path.endswith('.gz'): | |
open_fn = gzip.GzipFile | |
else: | |
open_fn = open | |
with open_fn(path) as fp: | |
raw = fp.read().decode('utf8') | |
return ([tuple(line.split('\t')) for line in sentence.split('\n')] | |
for sentence in raw.split('\n\n') if sentence) | |
def pad_data(data, margin=(2, 1), paddings=("<s>", "</s>")): | |
""" Pad given nested dataset. """ | |
return ([paddings[0]]*margin[0] + s + [paddings[1]]*margin[1] | |
for s in data) | |
def random_infinite(it): | |
""" Make an infinite generator that yields items from `it` in random | |
order, making sure that items only get repeated once a full pass | |
through `it` has been done. | |
""" | |
# Materialize iterator if we don't have a list or tuple | |
if not isinstance(it, (list, tuple)): | |
it = tuple(it) | |
while True: | |
pool = sorted(it, key=lambda k: random.random()) | |
for itm in pool: | |
yield itm | |
class PerceptronTagger: | |
""" Part-of-Speech tagger using the Perceptron algorithm for training | |
and the Viterbi algorithm for determining the most likely tag sequence. | |
""" | |
def __init__(self, max_suffix_len=5, viterbi_pick=5): | |
""" Initialize the tagger. | |
:param max_suffix_len: Maximum suffix length to take into account | |
for training | |
:type max_suffix_len: int | |
:param viterbi_pick: Number of most likely predecessor tags to | |
take into account for tagging. | |
:type viterbi_pick: int | |
""" | |
self._max_suffix_len = max_suffix_len | |
self._viterbi_pick = viterbi_pick | |
def train(self, train_data, test_data=None, max_iterations=1e5, | |
test_every=2.5e4): | |
""" Train the tagger. | |
:param train_data: Tagged sentences to use for training | |
:type train_data: list of lists of (word, tag) tuples | |
:param test_data: Tagged sentences to use for testing | |
:type test_data: list of lists of (word, tag) tuples | |
:param max_iterations: Number of training iterations | |
:type max_iterations: int | |
:param test_every: Run tests on partially trained model after | |
a given amount of iterations. | |
:type test_every: float | |
:returns: Overall accuracy and per-tag | |
accuracy/precision/recall/F1 | |
:rtype: (float, | |
{tag: :py:class:`ClassificationResult`}) | |
""" | |
train_data = list( | |
pad_data(train_data, margin=(1, 1), | |
paddings=(('<s>', '<s>'), ('</s>', '</s>')))) | |
if test_data: | |
test_data = list( | |
pad_data(test_data, margin=(1, 1), | |
paddings=(('<s>', '<s>'), ('</s>', '</s>')))) | |
self._all_tags = list(set(chain(*((tag for word, tag in sentence) | |
for sentence in train_data)))) | |
self._weights = defaultdict(int) | |
sample_iter = util.with_progress(random_infinite(train_data), | |
max_iterations) | |
for num_iter, sample in enumerate(sample_iter): | |
if num_iter == max_iterations: | |
break | |
do_online_test = (test_data and test_every and num_iter and | |
not num_iter % test_every) | |
if do_online_test: | |
print("\nTesting accuracy on test set...", file=sys.stderr) | |
print("Accuracy after {} iterations: {}".format( | |
num_iter, util.color_score(self.score(test_data)[0], | |
digits=2)), | |
file=sys.stderr) | |
words, actual_tags = zip(*sample) | |
predicted_tags = self.tag(words, True) | |
if predicted_tags == actual_tags: | |
continue | |
feature_predicted = self._make_feature( | |
tuple(zip(words, predicted_tags))) | |
feature_actual = self._make_feature(tuple(sample)) | |
for feat in feature_predicted: | |
self._weights[feat] -= 1 | |
for feat in feature_actual: | |
self._weights[feat] += 1 | |
# Prune unneccessary weights (i.e. those with value 0) | |
to_prune = [f for f, w in self._weights.items() if not w] | |
for feat in to_prune: | |
del self._weights[feat] | |
print("\nPruned {} weights, {} remaining." | |
.format(len(to_prune), len(self._weights)), file=sys.stderr) | |
def tag(self, sentence, keep_padding=False): | |
""" Tag a tokenized sentence. | |
:param sentence: Tokenized sentence | |
:type sentence: list of str | |
:param keep_padding: Don't strip start/end-of-sentence tags frorm | |
output | |
:type keep_padding: bool | |
:returns: Tag sequence for the sentence | |
:rtype: list of str | |
""" | |
if sentence[0] != '<s>': | |
sentence = ['<s>'] + list(sentence) | |
if sentence[-1] != '</s>': | |
sentence = list(sentence) + ['</s>'] | |
viterbi_scores = [defaultdict(lambda: -float('inf')) for _ in sentence] | |
viterbi_scores[0]['<s>'] = 0 | |
viterbi_path = {tag: [tag] for tag in self._all_tags} | |
for idx, word in enumerate(sentence[1:], 1): | |
new_path = {} | |
for tag in self._all_tags: | |
all_scores = dict() | |
best_iter = sorted(viterbi_scores[idx-1].items(), | |
key=operator.itemgetter(1), reverse=True) | |
for cand_tag, prev_score in best_iter[:self._viterbi_pick]: | |
feature_vec = self._make_feature( | |
(word, tag, cand_tag)) | |
w_sum = sum(self._weights.get(f, 0) for f in feature_vec) | |
all_scores[cand_tag] = prev_score + w_sum | |
best_tag, best_score = max(all_scores.items(), | |
key=operator.itemgetter(1)) | |
viterbi_scores[idx][tag] = best_score | |
new_path[tag] = viterbi_path[best_tag] + [tag] | |
viterbi_path = new_path | |
best_state = max(viterbi_scores[-1].items(), | |
key=operator.itemgetter(1))[0] | |
if keep_padding: | |
return viterbi_path[best_state] | |
else: | |
return viterbi_path[best_state][1:-1] | |
@classmethod | |
def from_dict(cls, data): | |
""" Construct a new :py:class:`PerceptronTagger` instance from the data | |
in the passed dictionary (which was e.g. deserialized from JSON). | |
:param data: Data to construct new instance from | |
:type data: dict as returned by :py:meth:`to_dict` | |
:returns: A new instance | |
:rtype: :py:class:`PerceptronTagger` | |
""" | |
t = cls(data['max_suffix_len'], data['viterbi_pick']) | |
t._all_tags = data['all_tags'] | |
t._weights = defaultdict(int, util.tupledict_from_json(data['weights'])) | |
return t | |
def to_dict(self): | |
""" Serialize the tagger to a single dictionary. | |
:returns: The tagger state as a dictionary | |
:rtype: dict | |
""" | |
return { | |
'max_suffix_len': self._max_suffix_len, | |
'viterbi_pick': self._viterbi_pick, | |
'all_tags': self._all_tags, | |
'weights': util.make_dict_json_compatible(self._weights)} | |
def score(self, test_data): | |
""" Calculate performance on test data. | |
:param test_data: Tagged sentences to run test on | |
:type test_data: list of lists of (word, tag) pairs | |
:returns: Overall accuracy and per-tag | |
accuracy/precision/recall/F1 | |
:rtype: (float, {tag: :py:class:`ClassificationResult`}) | |
""" | |
is_padded = (test_data[0][0][0] == '<s>' and | |
test_data[0][-1][0] == '</s>') | |
gold = [] | |
observed = [] | |
for sent in util.with_progress(test_data): | |
words, tags = zip(*sent) | |
gold.append(tags) | |
observed.append(self.tag(words, is_padded)) | |
return util.score_classification( | |
list(chain(*gold)), list(chain(*observed))) | |
def _make_feature(self, data): | |
""" Create a feature vector for the given data. | |
Data can be: | |
- a list of (word, tag) tuples (= whole sentence), where the | |
first word is None (for the <s> tag) | |
- a single (word, tag, prev_tag) tuple (= single word) | |
""" | |
if not isinstance(data[0], tuple) and len(data) == 3: | |
word, tag, prev_tag = data | |
tag_bigrams = [(prev_tag, tag)] | |
words = [(tag, word.lower())] | |
lexical = [(tag, capitalization(word), make_suffix(word.lower(), n)) | |
for n in range(1, self._max_suffix_len)] | |
else: | |
tag_bigrams = list(zip(*[(tag for word, tag in data[k:]) | |
for k in range(2)])) | |
words = [(tag, word.lower()) for word, tag in data[1:]] | |
lexical = list(chain( | |
*(((tag, capitalization(word), make_suffix(word.lower(), n)) | |
for word, tag in data[1:]) | |
for n in range(1, self._max_suffix_len)))) | |
return tuple(chain(tag_bigrams, words, lexical)) | |
def train_cli(args): | |
""" Command-line interface for training. """ | |
tagger = PerceptronTagger(args.max_suffix_length, args.viterbi_pick) | |
all_data = list(load_data(args.corpus)) | |
if args.shuffle: | |
random.shuffle(all_data) | |
print("Calculating probabilities on training data from {}..." | |
.format(args.corpus), file=sys.stderr) | |
if args.no_test: | |
train_data = all_data | |
test_data = None | |
else: | |
train_data, test_data = util.split_dataset( | |
all_data, (100-100*args.test_size, 100*args.test_size)) | |
tagger.train(train_data, test_data, args.num_iterations, | |
args.test_every or None) | |
if args.output: | |
output = args.output | |
if not output.endswith(".gz"): | |
output += ".gz" | |
print("Writing gzip-compressed parameters to {}..." | |
.format(output), file=sys.stderr) | |
with gzip.open(output, 'wt') as fp: | |
json.dump(tagger.to_dict(), fp, indent=2) | |
else: | |
print("Writing parameters to standard output...", file=sys.stderr) | |
fp = sys.stdout | |
json.dump(tagger.to_dict(), sys.stdout, indent=2) | |
if test_data: | |
print("Running final test...", file=sys.stderr) | |
accuracy, scores = tagger.score(test_data) | |
print("Final accuracy: {:.2f}".format(accuracy), file=sys.stderr) | |
if args.verbose: | |
util.print_scores(scores) | |
def tag_cli(args): | |
if args.params.endswith(".gz"): | |
open_fn = gzip.open | |
else: | |
open_fn = open | |
with open_fn(args.params, 'rt') as fp: | |
tagger = PerceptronTagger.from_dict(json.load(fp)) | |
if args.sentence: | |
sentences = [s.split() for s in args.sentence] | |
else: | |
lines = sys.stdin.readlines() | |
if len(lines[0].split()) == 1: | |
# One token per line | |
sentences = [] | |
cur_sent = [] | |
for line in lines: | |
word = line.strip() | |
if word: | |
cur_sent.append(word) | |
else: | |
sentences.append(cur_sent) | |
cur_sent = [] | |
else: | |
# All tokens in one line | |
sentences = [l.split() for l in lines] | |
all_tags = map(tagger.tag, sentences) | |
for sent, tags in zip(sentences, all_tags): | |
for word, tag in zip(sent, tags): | |
print("{}\t{}".format(word, tag)) | |
print("\n", end="") | |
def get_argument_parser(): | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers() | |
train_parser = subparsers.add_parser("train", help="Train the tagger.") | |
train_parser.add_argument( | |
'corpus', help="Path to tagged corpus to train with") | |
train_parser.add_argument( | |
'-n', '--num-iterations', type=int, default=int(1e5), | |
help="Number of iterations to train for. Should be at least as high " | |
"as the total number of training samples available.") | |
train_parser.add_argument( | |
'-p', '--viterbi-pick', type=int, default=5, metavar='n', | |
help="Take only the n highest-scoring predecessors into account " | |
"when searching for the most likely tag sequence.") | |
train_parser.add_argument( | |
'-l', '--max-suffix-length', type=int, default=5, | |
help="Maximum length of word suffixes to take into account") | |
train_parser.add_argument( | |
'-o', '--output', help='Path to output file, defaults to stdout') | |
train_parser.add_argument( | |
'-t', '--test-size', type=float, default=0.2, | |
help='Fraction of the data to use for testing') | |
train_parser.add_argument( | |
'-s', '--shuffle', action='store_true', | |
help="Shuffle dataset before splitting into training and test " | |
"datasets.") | |
train_parser.add_argument( | |
'-v', '--verbose', action='store_true', | |
help="Display detailed scores for each tag") | |
train_parser.add_argument( | |
'--test-every', type=int, | |
help='Determine accuracy on test set after every nth iteration. ' | |
'Useful for tracking progress during training') | |
train_parser.add_argument( | |
'--no-test', action='store_true', | |
help="Use all of the data for training, don't test performance of " | |
"tagger on held-out data.") | |
train_parser.set_defaults(func=train_cli) | |
tag_parser = subparsers.add_parser("tag", help="Tag sentences") | |
tag_parser.add_argument( | |
'-p', '--params', type=str, required=True, | |
help="Path to trained parameter file") | |
tag_parser.add_argument( | |
'sentence', nargs='*', type=str, | |
help="Sentences to tag. Must be separated by newlines if passed via " | |
"stdin.") | |
tag_parser.set_defaults(func=tag_cli) | |
return parser | |
if __name__ == '__main__': | |
parser = get_argument_parser() | |
args = parser.parse_args() | |
args.func(args) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools as it | |
import math | |
import sys | |
import time | |
from collections import Counter, defaultdict, namedtuple | |
# Datatype that stores evaluation results | |
EvaluationResult = namedtuple("EvaluationResult", ["accuracy", "precision", | |
"recall", "f1"]) | |
def make_dict_json_compatible(dct): | |
""" Make sure that a given dictionary can be properly serializes to JSON. | |
Basically this converts dicts with non-scalar keys to nested lists. | |
""" | |
for k, v in dct.items(): | |
if isinstance(v, dict): | |
dct[k] = make_dict_json_compatible(v) | |
if any(isinstance(k, (tuple, list)) for k in dct.keys()): | |
return list(dct.items()) | |
else: | |
return dct | |
def tupledict_from_json(dct): | |
""" Convert a sequence of (key, value) pairs, where the key is a list, | |
to a dictionary with a tuple key. | |
This is neccessary because Python dict keys must be hashable, i.e. we | |
need an immutable data structure instead of a list. | |
:param dct: The sequence to convert to a dictionary | |
:type dct: Sequence of (list, sometype) pairs | |
:returns: The sequence as a dictionary with tuple keys | |
:rtype: tuple -> sometype dict | |
""" | |
return {tuple(key): value for key, value in dct} | |
def split_dataset(data, fractions): | |
""" Split a dataset into a number of chunks. | |
The chunk size is determined by the values in `fractions`, which is a | |
tuple of integers that denotes the relative sizes of the chunks. | |
E.g. fractions=(8, 2) will result in two chunks where the first one | |
has 80% of the data and the second one the remaining 20%. | |
:param data: The dataset to be split | |
:type data: A sequence | |
:param fractions: Relative sizes of the chunks | |
:type fractions: tuple of int | |
:returns: tuple of chunks | |
""" | |
total_size = len(data) | |
part_size = total_size / sum(fractions) | |
indices = [] | |
last_end = 0 | |
for frac in fractions: | |
start = last_end | |
last_end = start + math.ceil(frac*part_size) | |
indices.append((start, last_end)) | |
return tuple(data[start:end] for start, end in indices) | |
def score_classification(expected, observed): | |
""" Calculate scores for the given classification result. | |
:param expected: Expected categories/'gold standard' | |
:type expected: sequence of discrete values | |
:param observed: Observed/predicted categories/classification result | |
:type observed: sequence of discrete values | |
:returns: Accuracy of whole dataset and scores for each | |
individual category | |
:rtype: tuple of (float, category -> | |
:py:class:`EvaluationResult` dict) | |
""" | |
category_counts = Counter(expected) | |
true_pos = defaultdict(int) | |
false_pos = defaultdict(int) | |
for exp, obs in zip(expected, observed): | |
if exp == obs: | |
true_pos[exp] += 1 | |
else: | |
false_pos[obs] += 1 | |
category_scores = {cat: calculate_scores(fq, true_pos[cat], false_pos[cat]) | |
for cat, fq in category_counts.items()} | |
accuracy = sum(true_pos.values()) / len(expected) | |
return accuracy, category_scores | |
def calculate_scores(freq, true_pos, false_pos): | |
""" Calculate accuracy, precision, recall and F1 for the given evaluation | |
result. | |
:param freq: Total frequency of the category | |
:type freq: int | |
:param true_pos: Number of true positive categorizations | |
:type true_pos: int | |
:param false_pos: Number of false positive categorizations | |
:type false_pos: int | |
:returns: Accuracy, precision, recall and F1 | |
:rtype: :py:class:`EvaluationResult`, a named tuple | |
""" | |
false_neg = freq - true_pos | |
accuracy = true_pos/freq | |
try: | |
precision = true_pos/(true_pos + false_pos) | |
except ZeroDivisionError: | |
precision = 0 | |
recall = true_pos/(true_pos + false_neg) | |
try: | |
f1 = 2 * (precision * recall) / (precision + recall) | |
except ZeroDivisionError: | |
f1 = 0 | |
return EvaluationResult(accuracy, precision, recall, f1) | |
def color_score(score, digits=3, good=0.95, bad=0.75): | |
""" Format a score into a colored string. | |
:param score: The score to format | |
:type score: float | |
:param digits: The decimal places of the score to place | |
:type digits: int | |
:param good: Threshold above which a score is considered good | |
:type good: float | |
:param bad: Threshold below which a score is considered bad | |
:type bad: float | |
:returns: The formatted string | |
:rtype: str | |
""" | |
# Only colorize if we're writing to a TTY | |
if sys.stderr.isatty(): | |
reset_col = '\033[39m' | |
if score >= good: | |
color = '\033[32m' # green | |
elif score <= bad: | |
color = '\033[31m' # red | |
else: | |
color = reset_col # no color | |
else: | |
color = reset_col = "" | |
tmpl = "{}{:.%df}{}" % digits | |
return tmpl.format(color, score, reset_col) | |
def print_scores(category_scores): | |
""" Print a table of per-category scores. | |
:param category_scores: The scores to print | |
:type category_scores: category -> :py:class:`EvaluationResult` dict | |
""" | |
row_tmpl = "{:>9} || {:^10}| {:^10}| {:^10}| {:^10}" | |
header_tmpl = row_tmpl | |
if sys.stderr.isatty(): | |
# ANSI Color codes are counted during whitespace calculation, but not | |
# printed, so we add 10 (= color + reset) to the columns with | |
# color-coded values | |
row_tmpl = row_tmpl.replace("10", "20") | |
print("\nPer-Category Scores:", file=sys.stderr) | |
print(header_tmpl.format("Category", "Accuracy", "Precision", "Recall", | |
"F1"), file=sys.stderr) | |
print(header_tmpl.format(*it.repeat("=", 14)) | |
.replace(' ', '=').replace('|', '+'), file=sys.stderr) | |
for cat, score in sorted(category_scores.items(), key=lambda x: x[0]): | |
print(row_tmpl.format(cat, *map(color_score, score)), | |
file=sys.stderr) | |
def with_progress(it, num_expected=None): | |
if num_expected is None: | |
num_expected = len(it) | |
start = time.time() | |
for idx, x in enumerate(it): | |
yield x | |
it_per_sec = idx/(time.time()-start) | |
if not it_per_sec: | |
continue | |
eta_s = (num_expected-idx)//it_per_sec | |
eta_m = eta_s//60 | |
eta_s = eta_s%60 | |
eta_h = eta_m//60 | |
eta_m = eta_m%60 | |
eta = "{:02.0f}s".format(eta_s) | |
if eta_m: | |
eta = "{:02.0f}m".format(eta_m) + eta | |
if eta_h: | |
eta = "{:.0f}h".format(eta_h) + eta | |
print("\r{}/{} ({:.2f}%, {:.2f}/s) eta: {} ".format( | |
idx, int(num_expected), (idx/num_expected)*100, it_per_sec, eta), | |
end='', file=sys.stderr) | |
print("", file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment