Created
May 4, 2017 22:56
-
-
Save benoit-pierre/caeb5da1c7abd451d8f01cba2979bdf1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
__requires__ = ['progressbar2'] | |
import itertools | |
import multiprocessing | |
import re | |
import sys | |
from progressbar import ProgressBar | |
from progressbar.utils import get_terminal_size | |
from plover import orthography, system | |
from plover.config import DEFAULT_SYSTEM_NAME | |
from plover.dictionary.base import load_dictionary | |
from plover.registry import registry | |
registry.update() | |
system.setup(DEFAULT_SYSTEM_NAME) | |
def add_suffix(word, suffix, no_wordlist=False): | |
if no_wordlist: | |
wordlist = system.ORTHOGRAPHY_WORDS | |
system.ORTHOGRAPHY_WORDS = {} | |
try: | |
return orthography.add_suffix(word.lower(), suffix) | |
finally: | |
if no_wordlist: | |
system.ORTHOGRAPHY_WORDS = wordlist | |
class OrthographyConflictsFinder(object): | |
class Worker(multiprocessing.Process): | |
def __init__(self, queue, dictionary_name, words, suffixes): | |
multiprocessing.Process.__init__(self) | |
self.queue = queue | |
self.words = words | |
self.suffixes = suffixes | |
def _run(self): | |
dictionary = load_dictionary(dictionary_name) | |
dictionary_keys = set(dictionary._dict.keys()) | |
for w, wkl in self.words: | |
results = [] | |
for s, skl in self.suffixes: | |
t1 = add_suffix(w, s) | |
t2 = add_suffix(w, s, no_wordlist=True) | |
if t1 == t2: | |
# No conflict, next! | |
continue | |
# Only keep word+suffix combos that are | |
# not already mapped in the dictionary. | |
key_list = [] | |
for wk, sk in itertools.product(wkl, skl): | |
key = wk + sk | |
if key not in dictionary_keys: | |
key_list.append(key) | |
if key_list: | |
results.append((w, s, t1, t2, key_list)) | |
self.queue.put(results) | |
def run(self): | |
try: | |
self._run() | |
except KeyboardInterrupt: | |
pass | |
def __init__(self, max_jobs): | |
self.max_jobs = max_jobs | |
self.queue = multiprocessing.Queue() | |
WORD_RX = re.compile(r'^[A-Za-z]\w*$') | |
SUFFIX_RX = re.compile(r'^{\^(\w[^}]*)}$') | |
def analyse(self, dictionary_name): | |
print('analysing %s' % dictionary_name) | |
dictionary = load_dictionary(dictionary_name) | |
words = {} | |
suffixes = {} | |
for t, kl in dictionary.reverse.items(): | |
m = self.WORD_RX.match(t) | |
if m is not None: | |
words[t] = kl | |
continue | |
m = self.SUFFIX_RX.match(t) | |
if m is not None: | |
suffixes[m.group(1)] = kl | |
continue | |
words = list(words.items()) | |
suffixes = list(suffixes.items()) | |
dictionary_keys = set(dictionary._dict.keys()) | |
ruler = '-' * get_terminal_size()[0] | |
print(ruler) | |
print('%u words, %u suffixes' % (len(words), len(suffixes))) | |
print(ruler) | |
batch_size = len(words) // self.max_jobs | |
batches = [] | |
for n in range(self.max_jobs): | |
start = n * batch_size | |
if n == self.max_jobs - 1: | |
end = None | |
else: | |
end = start + batch_size | |
b = words[start:end] | |
batches.append(b) | |
print('using %u worker(s) [%s]' % ( | |
self.max_jobs, '+'.join(str(len(b)) for b in batches)) | |
) | |
print(ruler) | |
try: | |
workers = [] | |
for n in range(self.max_jobs): | |
w = self.Worker(self.queue, dictionary_name, batches[n], suffixes) | |
w.start() | |
workers.append(w) | |
conflicts = [] | |
pbar = ProgressBar(maxval=len(words)) | |
pbar.start() | |
while pbar.value < pbar.max_value: | |
conflicts.extend(self.queue.get()) | |
pbar += 1 | |
pbar.finish() | |
except KeyboardInterrupt: | |
pass | |
for w in workers: | |
w.join() | |
print(ruler) | |
print('%u conflicts' % len(conflicts)) | |
print(ruler) | |
print('word ^ suffix -> translation with/without wordlist [steno]') | |
print(ruler) | |
conflicts.sort() | |
num_missing_entries = 0 | |
for w, s, t1, t2, key_list in conflicts: | |
num_missing_entries += len(key_list) | |
print('%s ^ %s -> %s / %s' % (w, s, t1, t2)) | |
line = '' | |
for key in key_list: | |
steno = '/'.join(key) | |
if line and (len(line) + len(steno) + 2) > len(ruler): | |
print(line) | |
line = '' | |
line += ' ' + steno | |
if line: | |
print(line) | |
print(ruler) | |
print('%u missing entries' % num_missing_entries) | |
if __name__ == '__main__': | |
max_jobs = max(1, multiprocessing.cpu_count() // 2) | |
args = sys.argv[1:] | |
if args and args[0].startswith('-j'): | |
max_jobs = int(args.pop(0)[2:]) | |
assert max_jobs | |
if args: | |
dictionaries = args | |
else: | |
dictionaries = ['asset:plover:assets/main.json'] | |
finder = OrthographyConflictsFinder(max_jobs) | |
for dictionary_name in dictionaries: | |
finder.analyse(dictionary_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment