Last active
October 15, 2021 14:43
-
-
Save thatbudakguy/3d3a57f59d1dacb49e805899239f27c0 to your computer and use it in GitHub Desktop.
supar-kanbun tokenizer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SuParKanbunTokenizer(object): | |
to_disk = lambda self, *args, **kwargs: None | |
from_disk = lambda self, *args, **kwargs: None | |
to_bytes = lambda self, *args, **kwargs: None | |
from_bytes = lambda self, *args, **kwargs: None | |
def __init__(self, bert, segment, vocab) -> None: | |
self.bert = bert | |
self.vocab = vocab | |
self.simplify = {} | |
if bert.startswith("guwenbert"): | |
self.simplify = simplify | |
pos_labels_path = os.path.join(DOWNLOAD_DIR, "labelPOS.txt") | |
tagger_model_path = os.path.join(DOWNLOAD_DIR, bert + ".pos") | |
parser_model_path = os.path.join(tagger_model_path, bert + ".supar") | |
senter_model_path = os.path.join(DOWNLOAD_DIR, bert + ".danku") | |
with open(pos_labels_path, "r", encoding="utf-8") as file: | |
pos_labels = file.read() | |
# set up the POS tagger | |
self.tagger = AutoModelTagger(tagger_model_path, pos_labels.strip().split("\n")) | |
# set up the dependency parser (SuPaR) | |
self.parser = Parser.load(parser_model_path) | |
# if requested, set up the sentence segmenter (senter) | |
if segment: | |
self.senter = AutoModelTagger( | |
senter_model_path, ["B", "E", "E2", "E3", "M", "S"] | |
) | |
else: | |
self.senter = None | |
# set up glosses | |
self.gloss = MakeGloss() | |
def __call__(self, input) -> Doc: | |
text = "" | |
# make a first pass through the text, simplifiying chars if needed | |
for char in input: | |
if char in self.simplify: | |
text += self.simplify[char] | |
else: | |
text += char | |
# segment the text into sentences if requested | |
if self.senter: | |
unseg_text = text.replace("\n", "") | |
text = "" | |
# work in chunks of 500 characters at a time | |
while len(unseg_text) > 500: | |
chunk = self.senter(unseg_text[0:500]) | |
seg_chunk = "" | |
for char, tag in chunk: | |
seg_chunk += char | |
if tag == "S" or tag == "E": # labels for sentence boundary | |
seg_chunk += "\n" | |
# add the chunk to the text, except the final two sentences? | |
seg_chunk = "\n".join(seg_chunk.split("\n")[0:-2]) + "\n" | |
text += seg_chunk | |
unseg_text = unseg_text[len(seg_chunk.replace("\n", "")) :] | |
# final chunk: just add everything that's left | |
chunk = self.senter(unseg_text) | |
for char, tag in chunk: | |
text += char | |
if tag == "S" or tag == "E": | |
text += "\n" | |
# tag parts of speech. if the text is <500 chars, do it in one call | |
if len(text) < 500: | |
pos = self.tagger(text.replace("\n", "")) | |
# otherwise, do it in chunks | |
else: | |
pos = [] | |
untagged_text = "" | |
# buffer the text into untagged_text one sentence at a time; dump | |
# to tagger when the buffer is >400 chars | |
for line in text.strip().split("\n"): | |
untagged_text += line | |
if len(untagged_text) > 400: | |
pos += self.tagger(untagged_text) | |
untagged_text = "" | |
# final buffer: tag everything that's left | |
if len(untagged_text) > 0: | |
pos += self.tagger(untagged_text) | |
# dependency parse char-by-char in each sentence using SuPaR | |
parsed_sents = self.parser.predict( | |
[[char for char in sentence] for sentence in text.strip().split("\n")], | |
lang=None, | |
) | |
# reformat the data as a list of sentences, adding features per word | |
text = text.replace("\n", "") | |
i = 0 | |
sentences = [] | |
for sentence in parsed_sents.sentences: | |
words = [] | |
for head, deprel in zip(sentence.values[6], sentence.values[7]): | |
char = text[i] | |
trad_char = tradify[char] if char in tradify else char | |
words.append( | |
{ | |
"form": char, | |
"lemma": trad_char, | |
"pos": pos[i][1], | |
"head": head, | |
"deprel": deprel, | |
} | |
) | |
i += 1 | |
# walk backwards through the sentence to collapse compound words | |
# into single tokens. if a word has the "compound" deprel, merge | |
# it with its head and combine the form/lemma for the new token. | |
for j in reversed(range(0, len(words) - 1)): | |
if ( | |
words[j]["deprel"] == "compound" | |
and words[j]["head"] == j + 2 | |
and words[j]["pos"] == words[j + 1]["pos"] | |
): | |
part = words.pop(j) | |
words[j]["form"] = part["form"] + words[j]["form"] | |
words[j]["lemma"] = part["lemma"] + words[j]["lemma"] | |
# prevent accidentally pointing the head too far? | |
for k in range(0, len(words)): | |
if words[k]["head"] > j + 1: | |
words[k]["head"] -= 1 | |
# finally, add the sentence (wordlist) to the list of sentences | |
sentences.append(list(words)) | |
# global containers for features | |
vocab_strings = self.vocab.strings | |
root = vocab_strings.add("ROOT") | |
words = [] | |
lemmas = [] | |
pos = [] # UPOS | |
tags = [] # XPOS | |
feats = [] | |
heads = [] | |
deps = [] | |
spaces = [] | |
norms = [] | |
# go through token-by-token counting and adding to global containers. | |
# as things are added to the containers, also add their unique values | |
# to the global string store. | |
for sentence in sentences: | |
for i, token in enumerate(sentence): | |
# store the surface form | |
form = token["form"] | |
words.append(form) | |
# store the lemma | |
lemmas.append(vocab_strings.add(token["lemma"])) | |
# the POS predictions contains universal part-of-speech values, | |
# extended part-of-speech values, and features. split all of | |
# these up and store/track them independently | |
pos = token["pos"].split(",") | |
xpos = ",".join(pos[0:4]) | |
pos.append(vocab_strings.add(pos[4])) | |
tags.append(vocab_strings.add(xpos)) | |
feats.append(pos[5]) | |
# if the token was a root, store it in "heads" and count it as | |
# a root. otherwise store its head in "heads" and count it as | |
# whatever deprel it has | |
if token["deprel"] == "root": | |
heads.append(0) | |
deps.append(root) | |
else: | |
heads.append(token["head"] - i - 1) | |
deps.append(vocab_strings.add(token["deprel"])) | |
# spaces is a just a list of False since we don't use them | |
spaces.append(False) | |
# fetch the gloss based on the extended POS value. if there is | |
# a gloss, add it to the list of normalized forms. if not, just | |
# use whatever form we have | |
gloss = self.gloss(form, xpos) | |
if gloss != None: | |
norms.append(vocab_strings.add(gloss)) | |
else: | |
norms.append(vocab_strings.add(form)) | |
# initialize a new spaCy Doc object, then manually apply the features | |
doc = Doc(self.vocab, words=words, spaces=spaces) | |
features = numpy.array( | |
list(zip(lemmas, pos, tags, deps, heads, norms)), dtype="uint64" | |
) | |
doc.from_array([LEMMA, POS, TAG, DEP, HEAD, NORM], features) | |
# mark the doc as tagged and parsed. (why would this error?) | |
# set the predicted features on each token using set_morph. | |
try: | |
doc.is_tagged = True | |
doc.is_parsed = True | |
except: | |
for i, feat in enumerate(feats): | |
if feat != "_" and feat != "": | |
doc[i].set_morph(feat) | |
return doc |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment