|
#!/usr/bin/env python3 |
|
# Analyze conditional substructures of Korean Standard Language Dictionary XMLs |
|
# Copyright (C) 2022 Hong Minhee <https://hongminhee.org/> |
|
# |
|
# This program is free software: you can redistribute it and/or modify |
|
# it under the terms of the GNU General Public License as published by |
|
# the Free Software Foundation, either version 3 of the License, or |
|
# (at your option) any later version. |
|
# |
|
# This program is distributed in the hope that it will be useful, |
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
# GNU General Public License for more details. |
|
# |
|
# You should have received a copy of the GNU General Public License |
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. |
|
import dataclasses |
|
import functools |
|
import multiprocessing |
|
import os |
|
import pathlib |
|
import sys |
|
from typing import BinaryIO, Dict, Iterable, List, Set, Tuple |
|
try: |
|
from lxml import etree |
|
except ImportError: |
|
from xml.etree import ElementTree as etree |
|
|
|
|
|
DISCRIMINATORS = {'word_unit', 'word_type', 'pos', 'unit'} |
|
|
|
|
|
@dataclasses.dataclass(init=True, repr=True) |
|
class Element: |
|
tag: str |
|
discriminators: Dict[str, str] = dataclasses.field(default_factory=dict) |
|
collected_tags: Set[str] = dataclasses.field(default_factory=set) |
|
|
|
|
|
def analyze(input_: BinaryIO) -> Dict[str, Dict[str, Set[str]]]: |
|
result: Dict[str, Dict[str, Set[str]]] = {tag: {} for tag in DISCRIMINATORS} |
|
stack: List[Element] = [] |
|
for event, element in etree.iterparse(input_, events=('start', 'end')): |
|
if event == 'start': |
|
stack.append(Element(element.tag)) |
|
continue |
|
popped = stack.pop() |
|
assert popped.tag == element.tag, f'{stack} != {element.tag}' |
|
if stack: |
|
el = stack[-1] |
|
if element.tag in DISCRIMINATORS: |
|
el.discriminators[element.tag] = element.text |
|
el.collected_tags.add(element.tag) |
|
for discriminator, case in el.discriminators.items(): |
|
result[discriminator] \ |
|
.setdefault(case, set()) \ |
|
.update(el.collected_tags - {discriminator}) |
|
return result |
|
|
|
|
|
def merge_analytics( |
|
analytics: Iterable[Dict[str, Dict[str, Set[str]]]] |
|
) -> Dict[str, Dict[str, Set[str]]]: |
|
result: Dict[str, Dict[str, Set[str]]] = {tag: {} for tag in DISCRIMINATORS} |
|
for result in analytics: |
|
for discriminator, cases in result.items(): |
|
for case, tags in cases.items(): |
|
result[discriminator].setdefault(case, set()).update(tags) |
|
return result |
|
|
|
|
|
def print_analytics( |
|
analytics: Dict[str, Dict[str, Set[str]]], |
|
*, file=sys.stdout |
|
) -> None: |
|
started = False |
|
for discriminator, cases in sorted(analytics.items(), key=lambda p: p[0]): |
|
if started: |
|
print(file=file) |
|
print(f'<{discriminator}>', file=file) |
|
common_tags: Set[str] = \ |
|
functools.reduce(set.intersection, cases.values()) \ |
|
if cases \ |
|
else set() |
|
print(f' (Common): {", ".join(sorted(common_tags))}', file=file) |
|
for case, tags in cases.items(): |
|
print( |
|
f' {case}: {", ".join(sorted(tags - common_tags))}', |
|
file=file |
|
) |
|
started = True |
|
|
|
|
|
def analyze_file(filename: os.PathLike) -> Dict[str, Dict[str, Set[str]]]: |
|
with open(filename, 'rb') as f: |
|
return analyze(f) |
|
|
|
|
|
def main(): |
|
if len(sys.argv) < 2: |
|
print('error: too few arguments', file=sys.stderr) |
|
print('usage:', sys.argv[0], 'FILE...') |
|
raise SystemExit(1) |
|
files = [pathlib.Path(p) for p in sys.argv[1:]] |
|
if len(files) == 1: |
|
result = analyze_file(files[0]) |
|
else: |
|
with multiprocessing.Pool() as pool: |
|
result = merge_analytics(pool.imap_unordered(analyze_file, files)) |
|
print_analytics(result) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |