|
from __future__ import annotations |
|
from typing import TYPE_CHECKING, Optional, List |
|
import os |
|
import re |
|
import io |
|
import zipfile |
|
import sys |
|
import ctypes |
|
import urllib.request |
|
|
|
# --------------------------------------------------------------------------- |
|
# Type checking block (editor IntelliSense only; not loaded at runtime). |
|
# --------------------------------------------------------------------------- |
|
if TYPE_CHECKING: |
|
try: |
|
from host_stubs import HostAPI |
|
host: HostAPI = ... |
|
except ImportError: |
|
pass |
|
|
|
# --------------------------------------------------------------------------- |
|
# Constants |
|
# --------------------------------------------------------------------------- |
|
DLL_NAME = "migemo.dll" |
|
DICT_REL = os.path.join("dict", "utf-8", "migemo-dict") |
|
|
|
# Runtime files live in a migemo_runtime/ folder next to this script. |
|
_RUNTIME_DIR = os.path.normpath( |
|
os.path.join(os.path.dirname(os.path.abspath(__file__)), "migemo_runtime") |
|
) |
|
|
|
# C/Migemo Windows distribution (migemo.dll + dict/utf-8/migemo-dict). |
|
# The kaoriya "goto" link is a redirect to the current win64 zip; urllib follows it. |
|
DOWNLOAD_URLS = [ |
|
"https://files.kaoriya.net/goto/cmigemo_w64", |
|
] |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Match layer (pure logic, unit-tested) |
|
# --------------------------------------------------------------------------- |
|
def _build_pattern(engine, text: str) -> Optional["re.Pattern"]: |
|
"""Build a case-insensitive regex from a romaji query via migemo. |
|
|
|
Falls back to an escaped substring match when migemo is unavailable or |
|
produces an invalid pattern. Returns None for empty input. |
|
""" |
|
if not text: |
|
return None |
|
regex = None |
|
if engine is not None: |
|
try: |
|
regex = engine.query(text) |
|
except Exception: |
|
regex = None |
|
if regex: |
|
try: |
|
return re.compile(regex, re.IGNORECASE) |
|
except re.error: |
|
pass |
|
return re.compile(re.escape(text), re.IGNORECASE) |
|
|
|
|
|
def match_items(items, engine, text: str) -> List: |
|
"""Return items whose .name matches the migemo pattern, in display order.""" |
|
pattern = _build_pattern(engine, text) |
|
if pattern is None: |
|
return [] |
|
return [it for it in items if pattern.search(getattr(it, "name", "") or "")] |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Runtime extraction (pure logic, unit-tested) |
|
# --------------------------------------------------------------------------- |
|
def _find_by_basename(names, basename: str) -> Optional[str]: |
|
target = basename.lower() |
|
for n in names: |
|
if n.replace("\\", "/").split("/")[-1].lower() == target: |
|
return n |
|
return None |
|
|
|
|
|
def _extract_runtime(zip_bytes: bytes, dest_dir: str): |
|
"""Extract migemo.dll and the whole UTF-8 dict dir from a cmigemo zip. |
|
|
|
The entire ``dict/utf-8/`` directory is extracted (not just migemo-dict), |
|
because migemo_open auto-loads the sibling conversion tables |
|
(roma2hira.dat, hira2kata.dat, ...) from the dict's directory. Without |
|
them, romaji-to-hiragana conversion is disabled. |
|
|
|
Returns (dll_path, dict_path). Raises RuntimeError if the dll or the |
|
migemo-dict are absent. |
|
""" |
|
os.makedirs(dest_dir, exist_ok=True) |
|
dll_path = os.path.join(dest_dir, DLL_NAME) |
|
utf8_dir = os.path.join(dest_dir, "dict", "utf-8") |
|
dict_path = os.path.join(dest_dir, DICT_REL) |
|
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf: |
|
names = zf.namelist() |
|
dll_member = _find_by_basename(names, DLL_NAME) |
|
utf8_members = [ |
|
n for n in names |
|
if "/dict/utf-8/" in ("/" + n.replace("\\", "/").lower()) |
|
and not n.replace("\\", "/").endswith("/") |
|
] |
|
has_dict = any( |
|
n.replace("\\", "/").lower().endswith("utf-8/migemo-dict") |
|
for n in utf8_members |
|
) |
|
if not dll_member or not has_dict: |
|
raise RuntimeError( |
|
"zip does not contain expected migemo.dll / utf-8 migemo-dict" |
|
) |
|
os.makedirs(utf8_dir, exist_ok=True) |
|
with zf.open(dll_member) as src, open(dll_path, "wb") as dst: |
|
dst.write(src.read()) |
|
for member in utf8_members: |
|
basename = member.replace("\\", "/").split("/")[-1] |
|
with zf.open(member) as src, open(os.path.join(utf8_dir, basename), "wb") as dst: |
|
dst.write(src.read()) |
|
return dll_path, dict_path |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# Engine layer (ctypes wrapper + auto setup) — verified on a live host |
|
# --------------------------------------------------------------------------- |
|
class MigemoEngine: |
|
"""Thin ctypes wrapper over C/Migemo (migemo.dll), loaded by absolute path.""" |
|
|
|
def __init__(self, dll_path: str, dict_path: str): |
|
runtime_dir = os.path.dirname(os.path.abspath(dll_path)) |
|
# Ensure dependent DLL resolution works without relying on PATH. |
|
if hasattr(os, "add_dll_directory"): |
|
self._dll_dir = os.add_dll_directory(runtime_dir) |
|
self._lib = ctypes.CDLL(dll_path) |
|
self._lib.migemo_open.restype = ctypes.c_void_p |
|
self._lib.migemo_open.argtypes = [ctypes.c_char_p] |
|
self._lib.migemo_query.restype = ctypes.POINTER(ctypes.c_char) |
|
self._lib.migemo_query.argtypes = [ctypes.c_void_p, ctypes.c_char_p] |
|
self._lib.migemo_release.restype = None |
|
self._lib.migemo_release.argtypes = [ctypes.c_void_p, ctypes.c_void_p] |
|
self._lib.migemo_close.restype = None |
|
self._lib.migemo_close.argtypes = [ctypes.c_void_p] |
|
self._handle = self._lib.migemo_open(dict_path.encode("utf-8")) |
|
if not self._handle: |
|
raise RuntimeError("migemo_open returned NULL (dictionary load failed)") |
|
|
|
def query(self, text: str) -> str: |
|
ptr = self._lib.migemo_query(self._handle, text.encode("utf-8")) |
|
if not ptr: |
|
return "" |
|
try: |
|
value = ctypes.cast(ptr, ctypes.c_char_p).value |
|
return value.decode("utf-8", "replace") if value else "" |
|
finally: |
|
self._lib.migemo_release(self._handle, ctypes.cast(ptr, ctypes.c_void_p)) |
|
|
|
def close(self) -> None: |
|
if getattr(self, "_handle", None): |
|
self._lib.migemo_close(self._handle) |
|
self._handle = None |
|
|
|
|
|
_engine_cache = None |
|
|
|
|
|
def _download(urls, log) -> bytes: |
|
last_error = None |
|
for url in urls: |
|
try: |
|
log(f"migemo: downloading {url}") |
|
with urllib.request.urlopen(url, timeout=60) as resp: |
|
return resp.read() |
|
except Exception as e: # noqa: BLE001 - report and try next mirror |
|
last_error = e |
|
log(f"migemo: download failed for {url}: {e}") |
|
raise RuntimeError(f"all downloads failed: {last_error}") |
|
|
|
|
|
def ensure_engine(host): |
|
"""Load (and if needed auto-install) the migemo engine. Returns None on failure.""" |
|
global _engine_cache |
|
if _engine_cache is not None: |
|
return _engine_cache |
|
|
|
dll_path = os.path.join(_RUNTIME_DIR, DLL_NAME) |
|
dict_path = os.path.join(_RUNTIME_DIR, DICT_REL) |
|
|
|
if not (os.path.exists(dll_path) and os.path.exists(dict_path)): |
|
try: |
|
data = _download(DOWNLOAD_URLS, host.log) |
|
_extract_runtime(data, _RUNTIME_DIR) |
|
except Exception as e: # noqa: BLE001 |
|
host.log(f"migemo setup failed: {e}", host.LogLevel.ERROR) |
|
host.ui.ok_dialog( |
|
"Migemo Setup Failed", |
|
"migemo.dll と辞書を自動取得できませんでした。\n" |
|
"次のフォルダに手動で配置してください:\n" |
|
f"{_RUNTIME_DIR}\n" |
|
f"- {DLL_NAME}\n" |
|
f"- {DICT_REL}\n\n" |
|
"入手元: https://www.kaoriya.net/software/cmigemo/", |
|
) |
|
return None |
|
|
|
try: |
|
_engine_cache = MigemoEngine(dll_path, dict_path) |
|
except Exception as e: # noqa: BLE001 |
|
host.log(f"migemo load failed: {e}", host.LogLevel.ERROR) |
|
host.ui.ok_dialog("Migemo Load Failed", f"migemo.dll の読み込みに失敗しました:\n{e}") |
|
return None |
|
return _engine_cache |
|
|
|
|
|
# --------------------------------------------------------------------------- |
|
# UI layer (live-cursor incremental search) — verified on a live host |
|
# --------------------------------------------------------------------------- |
|
def migemo_isearch(host): |
|
"""Entry point: incremental migemo search over the current folder. |
|
|
|
Type romaji; the pane cursor follows the first match live. Prev/Next cycle |
|
candidates. Close dismisses the dialog, leaving the cursor on the last match. |
|
""" |
|
engine = ensure_engine(host) |
|
if engine is None: |
|
return # ensure_engine already reported the failure. |
|
|
|
result = host.folder_window.get_items(limit=0) |
|
items = list(result.items) if result else [] |
|
items = [it for it in items if it.name != ".." and getattr(it, "name", None) is not None] |
|
if not items: |
|
host.ui.ok_dialog("Migemo Search", "現在のフォルダに項目がありません。") |
|
return |
|
|
|
state = {"matches": [], "index": 0} |
|
|
|
dlg = host.ui.dialog("Migemo Search") |
|
tb = dlg.text("ローマ字:", "", initial_focus=True) |
|
# OK is the primary (default) button, so Enter confirms it. The dialog |
|
# API exposes no raw key events, so per-button keyboard shortcuts are not |
|
# available. |
|
buttons = dlg.group(host.ui.LayoutDirection.HORIZONTAL) |
|
prev_btn = buttons.button("Prev") |
|
next_btn = buttons.button("Next") |
|
close_btn = buttons.button("Close") |
|
|
|
def set_nav_enabled(enabled): |
|
# Runs only while the dialog is shown (from text_changed handler). |
|
try: |
|
prev_btn.enabled = enabled |
|
next_btn.enabled = enabled |
|
except Exception as e: # noqa: BLE001 |
|
host.log(f"migemo: enable toggle failed: {e}") |
|
|
|
def goto(idx): |
|
if not state["matches"]: |
|
return |
|
state["index"] = idx % len(state["matches"]) |
|
try: |
|
host.folder_window.set_cursor(state["matches"][state["index"]].path) |
|
except Exception as e: # noqa: BLE001 |
|
host.log(f"migemo: set_cursor failed: {e}") |
|
|
|
def on_changed(sender, value): |
|
try: |
|
state["matches"] = match_items(items, engine, value) |
|
state["index"] = 0 |
|
if state["matches"]: |
|
goto(0) |
|
set_nav_enabled(True) |
|
else: |
|
set_nav_enabled(False) |
|
except Exception as e: # noqa: BLE001 |
|
host.log(f"migemo: search error: {e}", host.LogLevel.ERROR) |
|
|
|
tb.text_changed += on_changed |
|
prev_btn.clicked += lambda s, e: goto(state["index"] - 1) |
|
next_btn.clicked += lambda s, e: goto(state["index"] + 1) |
|
close_btn.clicked += lambda s, e: dlg.close(host.ui.DialogResult.OK) |
|
|
|
dlg.show_modal() |