Created
May 19, 2026 08:01
-
-
Save me-suzy/09d3c6d58824cb6ac037ac735559e000 to your computer and use it in GitHub Desktop.
google_translate_docs_chrome.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| r""" | |
| Google Translate Docs automation - Chrome version. | |
| Flux: | |
| - scaneaza recursiv g:\ARHIVA\C pentru .doc/.docx | |
| - converteste .doc la .docx cu Microsoft Word | |
| - imparte documentele mai mari de 5 MB in parti _partea1, _partea2, ... | |
| - urca fiecare parte la Google Translate Docs, pe rand | |
| - asteapta 60 secunde dupa traducere, apoi descarca traducerea | |
| - reuneste partile traduse si exporta rezultatul final ca PDF *_FINALIZAT.pdf | |
| Nu modifica fisierele originale din g:\ARHIVA\C. | |
| """ | |
| import argparse | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import time | |
| import unicodedata | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Iterable | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options as ChromeOptions | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.common.exceptions import TimeoutException, WebDriverException | |
| PROJECT_DIR = Path(__file__).resolve().parent | |
| ARCHIVE_PATH = Path(os.environ.get("SIMPLU_GT_ARCHIVE_PATH", r"g:\ARHIVA\C")) | |
| TRANSLATE_URL = "https://translate.google.ro/?hl=ro&sl=auto&tl=ro&op=docs" | |
| WORK_DIR = PROJECT_DIR / "work" | |
| PARTS_DIR = WORK_DIR / "parts" | |
| CONVERTED_DIR = WORK_DIR / "converted_doc" | |
| DOWNLOADS_DIR = PROJECT_DIR / "downloads" | |
| FINAL_DIR = PROJECT_DIR / "final_pdf" | |
| LOG_DIR = PROJECT_DIR / "logs" | |
| POWERSHELL_DIR = PROJECT_DIR / "PowerShell" | |
| START_CHROME_PS1 = POWERSHELL_DIR / "Start-ChromeDebug.ps1" | |
| STATE_FILE = PROJECT_DIR / "state_google_translate_chrome.json" | |
| COMPLETED_REGISTRY_FILE = PROJECT_DIR / "completed_google_translate_docs.json" | |
| WINDOWS_DOWNLOADS_DIR = Path.home() / "Downloads" | |
| COMPLETED_SOURCE_DIR = Path( | |
| os.environ.get("SIMPLU_GT_COMPLETED_SOURCE_DIR", str(ARCHIVE_PATH / "GATA FINALIZAT")) | |
| ) | |
| CONVERTED_PDF_DIR = Path(os.environ.get("SIMPLU_GT_CONVERTED_PDF_DIR", r"d:\ENGLEZA\PDF-uri convertite")) | |
| MAX_UPLOAD_BYTES = int(os.environ.get("SIMPLU_GT_MAX_BYTES", "5000000")) | |
| MIN_SOURCE_BYTES = int(os.environ.get("SIMPLU_GT_MIN_SOURCE_BYTES", str(50 * 1024))) | |
| MAX_PAGES_PER_PART = int(os.environ.get("SIMPLU_GT_MAX_PAGES_PER_PART", "400")) | |
| TRANSLATE_WAIT_SEC = int(os.environ.get("SIMPLU_GT_TRANSLATE_WAIT_SEC", "60")) | |
| DOWNLOAD_WAIT_SEC = int(os.environ.get("SIMPLU_GT_DOWNLOAD_WAIT_SEC", "420")) | |
| BETWEEN_PARTS_SEC = int(os.environ.get("SIMPLU_GT_BETWEEN_PARTS_SEC", "60")) | |
| MAX_SPLIT_PARTS = int(os.environ.get("SIMPLU_GT_MAX_SPLIT_PARTS", "120")) | |
| TRANSLATE_ERROR_RETRIES = int(os.environ.get("SIMPLU_GT_TRANSLATE_ERROR_RETRIES", "2")) | |
| DOWNLOAD_ERROR_RETRIES = int(os.environ.get("SIMPLU_GT_DOWNLOAD_ERROR_RETRIES", "2")) | |
| KEEP_INTERMEDIATE = os.environ.get("SIMPLU_GT_KEEP_INTERMEDIATE", "0") == "1" | |
| TRANSLATED_DOC_EXTENSIONS = {".doc", ".docx", ".pdf"} | |
| CHROME_PATH = os.environ.get( | |
| "SIMPLU_CHROME_PATH", | |
| r"C:\Program Files\Google\Chrome\Application\chrome.exe", | |
| ) | |
| CHROME_PROFILE_DIR = os.environ.get( | |
| "SIMPLU_CHROME_PROFILE_DIR", | |
| r"C:\Users\necul\AppData\Local\Google\Chrome\User Data\Default", | |
| ) | |
| DEBUG_PORT = int(os.environ.get("SIMPLU_CHROME_DEBUG_PORT", "9222")) | |
| WORD_FORMAT_DOCX = 16 | |
| WORD_EXPORT_PDF = 17 | |
| WD_STATISTIC_PAGES = 2 | |
| WD_ACTIVE_END_PAGE_NUMBER = 3 | |
| WD_GOTO_PAGE = 1 | |
| WD_GOTO_ABSOLUTE = 1 | |
| WD_PAGE_BREAK = 7 | |
| PAGE_SPLIT_METHOD_VERSION = 3 | |
| def setup_logging() -> logging.Logger: | |
| try: | |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") | |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") | |
| except Exception: | |
| pass | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| log_path = LOG_DIR / f"google_translate_chrome_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.FileHandler(log_path, encoding="utf-8"), | |
| logging.StreamHandler(sys.stdout), | |
| ], | |
| ) | |
| logger = logging.getLogger("gt_docs") | |
| logger.info("Log: %s", log_path) | |
| return logger | |
| logger = setup_logging() | |
| def ensure_dirs() -> None: | |
| for directory in [WORK_DIR, PARTS_DIR, CONVERTED_DIR, DOWNLOADS_DIR, FINAL_DIR, LOG_DIR]: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| def file_mb(path: Path) -> float: | |
| return path.stat().st_size / (1024 * 1024) | |
| def safe_name(name: str) -> str: | |
| name = re.sub(r'[<>:"/\\|?*\x00-\x1f]', "_", name) | |
| name = re.sub(r"\s+", " ", name).strip() | |
| return name[:180] or "document" | |
| def normalize_for_match(text: str) -> str: | |
| text = unicodedata.normalize("NFKD", text or "") | |
| text = "".join(ch for ch in text if not unicodedata.combining(ch)) | |
| return text.lower() | |
| def title_match_key(text: str) -> str: | |
| return re.sub(r"[^a-z0-9]+", " ", normalize_for_match(text)).strip() | |
| def final_pdf_source_stem(pdf: Path) -> str: | |
| return re.sub(r"_FINALIZAT$", "", pdf.stem, flags=re.IGNORECASE) | |
| def source_is_in_completed_dir(path: Path) -> bool: | |
| try: | |
| completed_dir = COMPLETED_SOURCE_DIR.resolve() | |
| resolved = path.resolve() | |
| return completed_dir == resolved or completed_dir in resolved.parents | |
| except OSError: | |
| return False | |
| def unique_destination(directory: Path, name: str) -> Path: | |
| dest = directory / name | |
| if not dest.exists(): | |
| return dest | |
| stem = Path(name).stem | |
| suffix = Path(name).suffix | |
| counter = 1 | |
| while True: | |
| candidate = directory / f"{stem} ({counter}){suffix}" | |
| if not candidate.exists(): | |
| return candidate | |
| counter += 1 | |
| def doc_id(path: Path) -> str: | |
| raw = str(path.resolve()).lower().encode("utf-8", errors="ignore") | |
| return hashlib.sha1(raw).hexdigest()[:12] | |
| def alphabetical_key(path: Path) -> str: | |
| clean = re.sub(r"[^a-zA-Z0-9\s]", " ", path.name.lower()) | |
| return re.sub(r"\s+", " ", clean).strip() | |
| def scan_documents(root: Path) -> list[Path]: | |
| if not root.exists(): | |
| raise FileNotFoundError(f"Directorul sursa nu exista: {root}") | |
| docs = [ | |
| p for p in root.rglob("*") | |
| if p.is_file() | |
| and p.suffix.lower() in {".doc", ".docx"} | |
| and not p.name.startswith("~$") | |
| and not source_is_in_completed_dir(p) | |
| ] | |
| docs.sort(key=lambda p: (alphabetical_key(p.parent), alphabetical_key(p))) | |
| return docs | |
| def load_state() -> dict: | |
| if STATE_FILE.exists(): | |
| try: | |
| return json.loads(STATE_FILE.read_text(encoding="utf-8")) | |
| except Exception as exc: | |
| logger.warning("Nu pot citi state, pornesc unul nou: %s", exc) | |
| return {"documents": {}, "updated_at": ""} | |
| def save_state(state: dict) -> None: | |
| state["updated_at"] = datetime.now().isoformat(timespec="seconds") | |
| STATE_FILE.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") | |
| def load_completed_registry() -> dict: | |
| if COMPLETED_REGISTRY_FILE.exists(): | |
| try: | |
| data = json.loads(COMPLETED_REGISTRY_FILE.read_text(encoding="utf-8")) | |
| if isinstance(data, dict): | |
| data.setdefault("documents", {}) | |
| return data | |
| except Exception as exc: | |
| logger.warning("Nu pot citi registrul de finalizate, il reconstruiesc: %s", exc) | |
| return {"documents": {}, "updated_at": ""} | |
| def save_completed_registry(registry: dict) -> None: | |
| registry["updated_at"] = datetime.now().isoformat(timespec="seconds") | |
| COMPLETED_REGISTRY_FILE.write_text( | |
| json.dumps(registry, indent=2, ensure_ascii=False), | |
| encoding="utf-8", | |
| ) | |
| def registry_key_for_name(name: str) -> str: | |
| return title_match_key(safe_name(Path(name).stem)) | |
| def update_completed_registry_from_entry(entry: dict) -> None: | |
| original_text = entry.get("original") or entry.get("source_moved_to") or "" | |
| if not original_text: | |
| return | |
| key = registry_key_for_name(original_text) | |
| if not key: | |
| return | |
| registry = load_completed_registry() | |
| registry["documents"][key] = { | |
| "source_name": Path(original_text).name, | |
| "source_original": entry.get("original"), | |
| "source_moved_to": entry.get("source_moved_to"), | |
| "final_pdf": entry.get("final_pdf"), | |
| "matched_pdf_dir": entry.get("matched_pdf_dir"), | |
| "completed_at": entry.get("updated_at") or now_iso(), | |
| } | |
| save_completed_registry(registry) | |
| def registry_entry_pdf_exists(entry: dict | None) -> bool: | |
| if not entry: | |
| return False | |
| for key in ("final_pdf",): | |
| value = entry.get(key) | |
| if value and Path(value).exists(): | |
| return True | |
| return False | |
| def rebuild_completed_registry_from_state(state: dict) -> None: | |
| registry = load_completed_registry() | |
| changed = False | |
| for entry in state.get("documents", {}).values(): | |
| if entry.get("status") != "done": | |
| continue | |
| original_text = entry.get("original") or entry.get("source_moved_to") or "" | |
| if not original_text: | |
| continue | |
| if not registry_entry_pdf_exists(entry): | |
| continue | |
| key = registry_key_for_name(original_text) | |
| if not key or key in registry.get("documents", {}): | |
| continue | |
| registry["documents"][key] = { | |
| "source_name": Path(original_text).name, | |
| "source_original": entry.get("original"), | |
| "source_moved_to": entry.get("source_moved_to"), | |
| "final_pdf": entry.get("final_pdf"), | |
| "matched_pdf_dir": entry.get("matched_pdf_dir"), | |
| "completed_at": entry.get("updated_at") or now_iso(), | |
| } | |
| changed = True | |
| if changed: | |
| save_completed_registry(registry) | |
| def is_completed_in_registry(path: Path) -> bool: | |
| registry = load_completed_registry() | |
| entry = registry.get("documents", {}).get(registry_key_for_name(path.name)) | |
| return registry_entry_pdf_exists(entry) | |
| def move_completed_source_documents(root: Path, pdf_dirs: Iterable[Path], state: dict) -> int: | |
| if not root.exists(): | |
| return 0 | |
| final_by_key: dict[str, Path] = {} | |
| for pdf_dir in pdf_dirs: | |
| if not pdf_dir.exists(): | |
| continue | |
| for pdf in pdf_dir.glob("*.pdf"): | |
| key = title_match_key(final_pdf_source_stem(pdf)) | |
| if key: | |
| final_by_key[key] = pdf | |
| if not final_by_key: | |
| return 0 | |
| COMPLETED_SOURCE_DIR.mkdir(parents=True, exist_ok=True) | |
| moved = 0 | |
| for source in root.rglob("*"): | |
| if ( | |
| not source.is_file() | |
| or source.suffix.lower() not in {".doc", ".docx"} | |
| or source.name.startswith("~$") | |
| or source_is_in_completed_dir(source) | |
| ): | |
| continue | |
| final_pdf = final_by_key.get(title_match_key(safe_name(source.stem))) | |
| if not final_pdf: | |
| continue | |
| old_key = doc_id(source) | |
| dest = unique_destination(COMPLETED_SOURCE_DIR, source.name) | |
| shutil.move(str(source), str(dest)) | |
| state_entry = { | |
| "original": str(source), | |
| "status": "done", | |
| "final_pdf": str(final_pdf), | |
| "matched_pdf_dir": str(final_pdf.parent), | |
| "source_moved_to": str(dest), | |
| **file_signature(dest), | |
| "updated_at": now_iso(), | |
| } | |
| state["documents"][old_key] = state_entry | |
| update_completed_registry_from_entry(state_entry) | |
| logger.info("Mutat original finalizat: %s -> %s | PDF=%s", source, dest, final_pdf) | |
| moved += 1 | |
| if moved: | |
| save_state(state) | |
| logger.info("Am mutat %s fisiere sursa deja finalizate in: %s", moved, COMPLETED_SOURCE_DIR) | |
| return moved | |
| def now_iso() -> str: | |
| return datetime.now().isoformat(timespec="seconds") | |
| def find_existing_translation_for_part(part: Path) -> Path | None: | |
| """Cauta o traducere deja descarcata pentru partea data, dupa numele fisierului.""" | |
| expected_stem = part.stem | |
| search_roots = [DOWNLOADS_DIR] | |
| if WINDOWS_DOWNLOADS_DIR.exists() and WINDOWS_DOWNLOADS_DIR not in search_roots: | |
| search_roots.append(WINDOWS_DOWNLOADS_DIR) | |
| candidates = [ | |
| p for root in search_roots | |
| for ext in TRANSLATED_DOC_EXTENSIONS | |
| for p in root.rglob(f"*{ext}") | |
| if ( | |
| p.is_file() | |
| and p.suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| and ( | |
| p.stem == expected_stem | |
| or re.fullmatch(re.escape(expected_stem) + r" \(\d+\)", p.stem) | |
| ) | |
| ) | |
| ] | |
| if not candidates: | |
| return None | |
| return max(candidates, key=lambda p: p.stat().st_mtime) | |
| def has_existing_translation(parts: Iterable[Path]) -> bool: | |
| return any(find_existing_translation_for_part(part) for part in parts) | |
| def collect_translated_parts_from_state_or_disk( | |
| parts: list[Path], | |
| existing: dict, | |
| allow_disk_lookup: bool = True, | |
| ) -> list[Path | None]: | |
| saved = [Path(p) for p in existing.get("translated_parts", []) if p] | |
| result: list[Path | None] = [] | |
| for idx, part in enumerate(parts): | |
| found: Path | None = None | |
| if ( | |
| idx < len(saved) | |
| and saved[idx].exists() | |
| and saved[idx].suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| ): | |
| found = saved[idx] | |
| if found is None and allow_disk_lookup: | |
| found = find_existing_translation_for_part(part) | |
| result.append(found) | |
| return result | |
| def is_rpc_unavailable(exc: Exception) -> bool: | |
| text = str(exc).lower() | |
| return "-2147023174" in text or "rpc server is unavailable" in text | |
| def is_word_corrupt_or_unreadable(exc: Exception) -> bool: | |
| text = str(exc).lower() | |
| needles = [ | |
| "appears to be corrupted", | |
| "file appears to be corrupted", | |
| "corrupted", | |
| "is corrupt", | |
| "word experienced an error trying to open the file", | |
| "-2146822496", | |
| ] | |
| return any(needle in text for needle in needles) | |
| def is_download_timeout(exc: Exception) -> bool: | |
| return "download-ul traducerii nu a aparut la timp" in str(exc).lower() | |
| def is_lost_browser_window(exc: Exception) -> bool: | |
| text = str(exc).lower() | |
| return ( | |
| "no such window" in text | |
| or "target window already closed" in text | |
| or "web view not found" in text | |
| ) | |
| def file_signature(path: Path) -> dict: | |
| try: | |
| st = path.stat() | |
| return {"source_size": st.st_size, "source_mtime": st.st_mtime} | |
| except OSError: | |
| return {} | |
| def is_same_skipped_source(existing: dict, path: Path) -> bool: | |
| signature = file_signature(path) | |
| return ( | |
| existing.get("status") == "skipped" | |
| and signature | |
| and existing.get("source_size") == signature.get("source_size") | |
| and existing.get("source_mtime") == signature.get("source_mtime") | |
| ) | |
| def mark_document_skipped(state: dict, key: str, original: Path, reason: str, detail: str) -> None: | |
| state["documents"][key] = { | |
| "original": str(original), | |
| "status": "skipped", | |
| "skip_reason": reason, | |
| "skip_detail": detail[:1000], | |
| **file_signature(original), | |
| "updated_at": now_iso(), | |
| } | |
| save_state(state) | |
| def mark_document_temp_skipped( | |
| state: dict, | |
| key: str, | |
| original: Path, | |
| reason: str, | |
| detail: str, | |
| parts: list[Path], | |
| translated_parts: list[Path | None], | |
| ) -> None: | |
| state["documents"][key] = { | |
| "original": str(original), | |
| "status": "google_temp_skipped", | |
| "skip_reason": reason, | |
| "skip_detail": detail[:1000], | |
| "parts": [str(p) for p in parts], | |
| "translated_parts": [str(p) for p in translated_parts if p], | |
| **file_signature(original), | |
| "updated_at": now_iso(), | |
| } | |
| save_state(state) | |
| class DocumentSplitError(RuntimeError): | |
| pass | |
| class WordManager: | |
| def __init__(self) -> None: | |
| self.app = None | |
| self._pythoncom = None | |
| self._win32com = None | |
| def __enter__(self): | |
| self._start_app() | |
| return self | |
| def _start_app(self) -> None: | |
| import pythoncom | |
| import win32com.client | |
| self._pythoncom = pythoncom | |
| self._win32com = win32com.client | |
| pythoncom.CoInitialize() | |
| self.app = win32com.client.DispatchEx("Word.Application") | |
| self.app.Visible = False | |
| self.app.DisplayAlerts = 0 | |
| def __exit__(self, exc_type, exc, tb) -> None: | |
| self.close() | |
| def close(self) -> None: | |
| if self.app is not None: | |
| try: | |
| self.app.Quit() | |
| except Exception: | |
| pass | |
| self.app = None | |
| def restart(self) -> None: | |
| logger.warning("Repornesc Microsoft Word COM...") | |
| self.close() | |
| time.sleep(2) | |
| self._start_app() | |
| def ensure_app(self) -> None: | |
| if self.app is None: | |
| self._start_app() | |
| return | |
| try: | |
| _ = self.app.Version | |
| except Exception as exc: | |
| logger.warning("Microsoft Word COM nu mai raspunde: %s", exc) | |
| self.restart() | |
| def convert_to_docx(self, source: Path) -> Path: | |
| self.ensure_app() | |
| if source.suffix.lower() == ".docx" and source.stat().st_size <= MAX_UPLOAD_BYTES: | |
| return source | |
| out_dir = CONVERTED_DIR / doc_id(source) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / f"{safe_name(source.stem)}.docx" | |
| if source.suffix.lower() == ".docx": | |
| if out_path.exists() and out_path.stat().st_mtime >= source.stat().st_mtime: | |
| return out_path | |
| shutil.copyfile(source, out_path) | |
| return out_path | |
| logger.info("Convertesc .doc la .docx: %s", source) | |
| doc = self.app.Documents.Open(str(source), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| doc.SaveAs2(str(out_path), FileFormat=WORD_FORMAT_DOCX) | |
| finally: | |
| doc.Close(False) | |
| return out_path | |
| def split_docx_if_needed(self, source_docx: Path, original: Path, on_part_saved=None) -> list[Path]: | |
| self.ensure_app() | |
| doc = self.app.Documents.Open(str(source_docx), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| doc.Repaginate() | |
| pages = max(1, int(doc.ComputeStatistics(WD_STATISTIC_PAGES))) | |
| size = source_docx.stat().st_size | |
| size_parts = (size + MAX_UPLOAD_BYTES - 1) // MAX_UPLOAD_BYTES | |
| page_parts = (pages + MAX_PAGES_PER_PART - 1) // MAX_PAGES_PER_PART | |
| initial_parts = max(1, size_parts, page_parts) | |
| if initial_parts <= 1: | |
| logger.info( | |
| "Fara split: %s (%.2f MB, %s pagini)", | |
| source_docx.name, | |
| file_mb(source_docx), | |
| pages, | |
| ) | |
| return [source_docx] | |
| part_dir = PARTS_DIR / doc_id(original) | |
| part_dir.mkdir(parents=True, exist_ok=True) | |
| base = safe_name(original.stem) | |
| logger.info( | |
| "Split necesar: %s (%.2f MB, %s pagini; limita %s MB si %s pagini/parte)", | |
| source_docx, | |
| file_mb(source_docx), | |
| pages, | |
| MAX_UPLOAD_BYTES / (1024 * 1024), | |
| MAX_PAGES_PER_PART, | |
| ) | |
| source_stat = source_docx.stat() | |
| meta_path = part_dir / "_split_progress.json" | |
| ranges = self._initial_page_ranges(pages, initial_parts) | |
| while True: | |
| part_count = len(ranges) | |
| if part_count > MAX_SPLIT_PARTS: | |
| raise DocumentSplitError( | |
| f"Nu am reusit sa impart {source_docx.name}: ar fi nevoie de peste " | |
| f"{MAX_SPLIT_PARTS} parti pentru limita de {MAX_UPLOAD_BYTES} bytes" | |
| ) | |
| expected_meta = { | |
| "source": str(source_docx), | |
| "source_size": source_stat.st_size, | |
| "source_mtime": source_stat.st_mtime, | |
| "pages": pages, | |
| "part_count": part_count, | |
| "ranges": ranges, | |
| "max_upload_bytes": MAX_UPLOAD_BYTES, | |
| "max_pages_per_part": MAX_PAGES_PER_PART, | |
| "split_method_version": PAGE_SPLIT_METHOD_VERSION, | |
| } | |
| split_meta: dict = {} | |
| if meta_path.exists(): | |
| try: | |
| split_meta = json.loads(meta_path.read_text(encoding="utf-8")) | |
| except Exception: | |
| split_meta = {} | |
| same_split = all(split_meta.get(k) == v for k, v in expected_meta.items()) | |
| if same_split: | |
| logger.info("Resume split existent: %s parti planificate.", part_count) | |
| else: | |
| for old in part_dir.glob(f"{base}_partea*.docx"): | |
| old.unlink(missing_ok=True) | |
| split_meta = { | |
| **expected_meta, | |
| "done_parts": [], | |
| "updated_at": now_iso(), | |
| } | |
| meta_path.write_text(json.dumps(split_meta, indent=2, ensure_ascii=False), encoding="utf-8") | |
| done_by_index = { | |
| int(item.get("index")): item | |
| for item in split_meta.get("done_parts", []) | |
| if item.get("index") | |
| } | |
| paths: list[Path] = [] | |
| for idx, page_range in enumerate(ranges): | |
| start_page, end_page = int(page_range[0]), int(page_range[1]) | |
| default_out_path = part_dir / f"{base}_partea{idx + 1}.docx" | |
| done_item = done_by_index.get(idx + 1) | |
| done_path = Path(done_item.get("path", "")) if done_item and done_item.get("path") else None | |
| done_range = [ | |
| int(done_item.get("start_page", -1)), | |
| int(done_item.get("end_page", -1)), | |
| ] if done_item else [] | |
| out_path = done_path if done_path else default_out_path | |
| if ( | |
| done_item | |
| and done_range == [start_page, end_page] | |
| and out_path.exists() | |
| and out_path.stat().st_size > 0 | |
| ): | |
| logger.info( | |
| "Split deja existent: partea %s/%s | pagini %s-%s | %.2f MB | %s", | |
| idx + 1, | |
| part_count, | |
| start_page, | |
| end_page, | |
| file_mb(out_path), | |
| out_path, | |
| ) | |
| else: | |
| logger.info( | |
| "Split incep: partea %s/%s | pagini %s-%s -> %s", | |
| idx + 1, | |
| part_count, | |
| start_page, | |
| end_page, | |
| default_out_path.name, | |
| ) | |
| out_path = self._save_page_range(doc, start_page, end_page, default_out_path) | |
| split_meta["done_parts"] = [ | |
| item for item in split_meta.get("done_parts", []) | |
| if int(item.get("index", 0)) != idx + 1 | |
| ] | |
| split_meta["done_parts"].append( | |
| { | |
| "index": idx + 1, | |
| "path": str(out_path), | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "size": out_path.stat().st_size, | |
| "updated_at": now_iso(), | |
| } | |
| ) | |
| split_meta["updated_at"] = now_iso() | |
| meta_path.write_text(json.dumps(split_meta, indent=2, ensure_ascii=False), encoding="utf-8") | |
| logger.info( | |
| "Split terminat: partea %s/%s | pagini %s-%s | %.2f MB | %s", | |
| idx + 1, | |
| part_count, | |
| start_page, | |
| end_page, | |
| file_mb(out_path), | |
| out_path, | |
| ) | |
| paths.append(out_path) | |
| if on_part_saved: | |
| on_part_saved(paths.copy(), idx + 1, part_count, start_page, end_page, out_path) | |
| too_big = [ | |
| (idx, p, ranges[idx]) | |
| for idx, p in enumerate(paths) | |
| if p.stat().st_size > MAX_UPLOAD_BYTES | |
| ] | |
| too_many_pages = [ | |
| (idx, p, start, end) | |
| for idx, (p, (start, end)) in enumerate(zip(paths, ranges)) | |
| if end - start + 1 > MAX_PAGES_PER_PART | |
| ] | |
| if not too_big and not too_many_pages: | |
| for p, (start, end) in zip(paths, ranges): | |
| logger.info( | |
| "Parte: %s (%.2f MB, pagini %s-%s, total %s)", | |
| p.name, | |
| file_mb(p), | |
| start, | |
| end, | |
| end - start + 1, | |
| ) | |
| return paths | |
| if too_big: | |
| logger.info( | |
| "Inca exista parti peste limita de MB la %s parti: %s", | |
| part_count, | |
| ", ".join(f"{p.name}={file_mb(p):.2f}MB" for _, p, _ in too_big[:3]), | |
| ) | |
| if too_many_pages: | |
| logger.info( | |
| "Inca exista parti peste limita de pagini la %s parti: %s", | |
| part_count, | |
| ", ".join(f"{p.name}={end - start + 1} pagini" for _, p, start, end in too_many_pages[:3]), | |
| ) | |
| split_indexes = {idx for idx, _, _ in too_big} | |
| split_indexes.update(idx for idx, _, _, _ in too_many_pages) | |
| unsplittable = [] | |
| new_ranges: list[list[int]] = [] | |
| for idx, (start, end) in enumerate(ranges): | |
| if idx not in split_indexes: | |
| new_ranges.append([start, end]) | |
| continue | |
| if start >= end: | |
| unsplittable.append((idx + 1, paths[idx], start, end)) | |
| new_ranges.append([start, end]) | |
| continue | |
| mid = (start + end) // 2 | |
| new_ranges.append([start, mid]) | |
| new_ranges.append([mid + 1, end]) | |
| if unsplittable: | |
| details = ", ".join( | |
| f"{p.name} pag.{start} ({file_mb(p):.2f}MB)" | |
| for _, p, start, _ in unsplittable[:5] | |
| ) | |
| raise DocumentSplitError( | |
| f"Nu pot imparti mai mult {source_docx.name}; exista pagina individuala peste limita: {details}" | |
| ) | |
| logger.info( | |
| "Rafinez split adaptiv: %s -> %s parti; impart doar intervalele prea mari.", | |
| len(ranges), | |
| len(new_ranges), | |
| ) | |
| ranges = new_ranges | |
| finally: | |
| doc.Close(False) | |
| def _initial_page_ranges(self, pages: int, part_count: int) -> list[list[int]]: | |
| ranges: list[list[int]] = [] | |
| for idx in range(part_count): | |
| start_page = int(idx * pages / part_count) + 1 | |
| end_page = int((idx + 1) * pages / part_count) | |
| end_page = max(start_page, min(pages, end_page)) | |
| ranges.append([start_page, end_page]) | |
| return ranges | |
| def _save_page_range(self, doc, start_page: int, end_page: int, out_path: Path) -> Path: | |
| pages = max(1, int(doc.ComputeStatistics(WD_STATISTIC_PAGES))) | |
| start = self._page_start_position(doc, start_page, pages) | |
| if end_page >= pages: | |
| end = doc.Content.End | |
| else: | |
| end = self._page_start_position(doc, end_page + 1, pages) - 1 | |
| if end <= start: | |
| logger.warning( | |
| "Interval pagini fara continut text detectabil: pagini %s-%s in %s", | |
| start_page, | |
| end_page, | |
| out_path.name, | |
| ) | |
| end = start | |
| src_range = doc.Range(Start=start, End=max(start, end)) | |
| new_doc = self.app.Documents.Add() | |
| tmp_path = None | |
| pdf_tmp_path = None | |
| new_doc_closed = False | |
| pdf_out_path = out_path.with_suffix(".pdf") | |
| try: | |
| new_doc.Range(0, 0).FormattedText = src_range.FormattedText | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.unlink(missing_ok=True) | |
| pdf_out_path.unlink(missing_ok=True) | |
| tmp_dir = WORK_DIR / "tmp_save" | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| tmp_hash = hashlib.sha1(str(out_path).lower().encode("utf-8", errors="ignore")).hexdigest()[:12] | |
| tmp_name = f"{tmp_hash}_{time.time_ns()}" | |
| tmp_path = tmp_dir / f"{tmp_name}.docx" | |
| tmp_path.unlink(missing_ok=True) | |
| pdf_tmp_path = tmp_dir / f"{tmp_name}.pdf" | |
| pdf_tmp_path.unlink(missing_ok=True) | |
| try: | |
| new_doc.SaveAs2(str(tmp_path), FileFormat=WORD_FORMAT_DOCX) | |
| except Exception as exc: | |
| logger.warning( | |
| "Word nu poate salva partea ca DOCX; incerc PDF pentru %s (pagini %s-%s): %s", | |
| out_path.name, | |
| start_page, | |
| end_page, | |
| exc, | |
| ) | |
| try: | |
| new_doc.ExportAsFixedFormat(OutputFileName=str(pdf_tmp_path), ExportFormat=WORD_EXPORT_PDF) | |
| except Exception as pdf_exc: | |
| raise DocumentSplitError( | |
| f"Word nu poate salva partea {out_path.name} nici DOCX, nici PDF " | |
| f"(pagini {start_page}-{end_page}): DOCX={exc}; PDF={pdf_exc}" | |
| ) from pdf_exc | |
| new_doc.Close(False) | |
| new_doc_closed = True | |
| shutil.move(str(pdf_tmp_path), str(pdf_out_path)) | |
| return pdf_out_path | |
| new_doc.Close(False) | |
| new_doc_closed = True | |
| shutil.move(str(tmp_path), str(out_path)) | |
| return out_path | |
| finally: | |
| try: | |
| if not new_doc_closed: | |
| new_doc.Close(False) | |
| finally: | |
| for leftover in (tmp_path, pdf_tmp_path): | |
| if leftover is not None: | |
| try: | |
| leftover.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def _page_number_at_position(self, doc, position: int) -> int: | |
| content_start = int(doc.Content.Start) | |
| content_end = int(doc.Content.End) | |
| if content_end <= content_start: | |
| return 1 | |
| position = max(content_start, min(int(position), content_end - 1)) | |
| return int(doc.Range(position, position).Information(WD_ACTIVE_END_PAGE_NUMBER)) | |
| def _page_start_position(self, doc, target_page: int, total_pages: int) -> int: | |
| if target_page <= 1: | |
| return int(doc.Content.Start) | |
| if target_page > total_pages: | |
| return int(doc.Content.End) | |
| lo = int(doc.Content.Start) | |
| hi = int(doc.Content.End) | |
| while lo < hi: | |
| mid = (lo + hi) // 2 | |
| if self._page_number_at_position(doc, mid) >= target_page: | |
| hi = mid | |
| else: | |
| lo = mid + 1 | |
| return lo | |
| def export_upload_part_to_pdf(self, part: Path) -> Path: | |
| self.ensure_app() | |
| if part.suffix.lower() == ".pdf": | |
| return part | |
| if part.suffix.lower() not in {".doc", ".docx"}: | |
| raise ValueError(f"Nu pot face fallback PDF pentru extensia: {part}") | |
| pdf_path = part.with_suffix(".pdf") | |
| if pdf_path.exists() and pdf_path.stat().st_mtime >= part.stat().st_mtime: | |
| return pdf_path | |
| tmp_dir = WORK_DIR / "tmp_save" | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| tmp_name = hashlib.sha1(str(pdf_path).lower().encode("utf-8", errors="ignore")).hexdigest()[:12] | |
| tmp_pdf = tmp_dir / f"{tmp_name}_{time.time_ns()}.pdf" | |
| tmp_pdf.unlink(missing_ok=True) | |
| pdf_path.unlink(missing_ok=True) | |
| src_doc = self.app.Documents.Open(str(part), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| src_doc.ExportAsFixedFormat(OutputFileName=str(tmp_pdf), ExportFormat=WORD_EXPORT_PDF) | |
| finally: | |
| src_doc.Close(False) | |
| shutil.move(str(tmp_pdf), str(pdf_path)) | |
| logger.info("Fallback PDF pentru upload: %s -> %s (%.2f MB)", part.name, pdf_path.name, file_mb(pdf_path)) | |
| return pdf_path | |
| def export_translated_parts_to_pdf(self, translated_parts: list[Path], original: Path) -> Path: | |
| last_error: Exception | None = None | |
| for attempt in range(1, 3): | |
| try: | |
| self.ensure_app() | |
| return self._export_translated_parts_to_pdf_once(translated_parts, original) | |
| except Exception as exc: | |
| last_error = exc | |
| if attempt == 1 and is_rpc_unavailable(exc): | |
| logger.warning("Word COM/RPC a cazut la export PDF. Repornesc Word si reincerc.") | |
| self.restart() | |
| continue | |
| raise | |
| raise RuntimeError(f"Nu am putut exporta PDF pentru {original}") from last_error | |
| def _export_translated_parts_to_pdf_once(self, translated_parts: list[Path], original: Path) -> Path: | |
| final_base = f"{safe_name(original.stem)}_FINALIZAT" | |
| final_docx = WORK_DIR / f"{final_base}_{doc_id(original)}.docx" | |
| final_pdf = FINAL_DIR / f"{final_base}.pdf" | |
| if len(translated_parts) == 1: | |
| part = translated_parts[0] | |
| if part.suffix.lower() == ".pdf": | |
| shutil.copyfile(str(part), str(final_pdf)) | |
| return final_pdf | |
| if part.suffix.lower() not in {".doc", ".docx"}: | |
| raise ValueError(f"Extensie tradusa nesuportata pentru PDF: {part}") | |
| src_doc = self.app.Documents.Open(str(part), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| src_doc.ExportAsFixedFormat(OutputFileName=str(final_pdf), ExportFormat=WORD_EXPORT_PDF) | |
| finally: | |
| src_doc.Close(False) | |
| return final_pdf | |
| if any(part.suffix.lower() == ".pdf" for part in translated_parts): | |
| self._merge_parts_as_pdf(translated_parts, final_pdf, original) | |
| return final_pdf | |
| doc = self.app.Documents.Add() | |
| try: | |
| for idx, part in enumerate(translated_parts): | |
| if part.suffix.lower() not in {".doc", ".docx"}: | |
| raise ValueError(f"Extensie tradusa nesuportata pentru merge: {part}") | |
| src_doc = self.app.Documents.Open(str(part), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| if idx > 0: | |
| rng = doc.Range(doc.Content.End - 1, doc.Content.End - 1) | |
| rng.InsertBreak(WD_PAGE_BREAK) | |
| rng = doc.Range(doc.Content.End - 1, doc.Content.End - 1) | |
| rng.FormattedText = src_doc.Content.FormattedText | |
| finally: | |
| src_doc.Close(False) | |
| doc.SaveAs2(str(final_docx), FileFormat=WORD_FORMAT_DOCX) | |
| doc.ExportAsFixedFormat(OutputFileName=str(final_pdf), ExportFormat=WORD_EXPORT_PDF) | |
| finally: | |
| doc.Close(False) | |
| if not KEEP_INTERMEDIATE: | |
| final_docx.unlink(missing_ok=True) | |
| return final_pdf | |
| def _merge_parts_as_pdf(self, translated_parts: list[Path], final_pdf: Path, original: Path) -> None: | |
| from pypdf import PdfReader, PdfWriter | |
| tmp_dir = WORK_DIR / "tmp_pdf_parts" / doc_id(original) | |
| tmp_dir.mkdir(parents=True, exist_ok=True) | |
| pdf_parts: list[Path] = [] | |
| temp_created: list[Path] = [] | |
| try: | |
| for idx, part in enumerate(translated_parts, 1): | |
| if part.suffix.lower() == ".pdf": | |
| pdf_parts.append(part) | |
| continue | |
| if part.suffix.lower() not in {".doc", ".docx"}: | |
| raise ValueError(f"Extensie tradusa nesuportata pentru merge PDF: {part}") | |
| temp_pdf = tmp_dir / f"part_{idx:03d}.pdf" | |
| temp_pdf.unlink(missing_ok=True) | |
| src_doc = self.app.Documents.Open(str(part), ReadOnly=True, AddToRecentFiles=False) | |
| try: | |
| src_doc.ExportAsFixedFormat(OutputFileName=str(temp_pdf), ExportFormat=WORD_EXPORT_PDF) | |
| finally: | |
| src_doc.Close(False) | |
| pdf_parts.append(temp_pdf) | |
| temp_created.append(temp_pdf) | |
| writer = PdfWriter() | |
| for pdf_part in pdf_parts: | |
| reader = PdfReader(str(pdf_part)) | |
| for page in reader.pages: | |
| writer.add_page(page) | |
| final_pdf.parent.mkdir(parents=True, exist_ok=True) | |
| with final_pdf.open("wb") as fh: | |
| writer.write(fh) | |
| finally: | |
| if not KEEP_INTERMEDIATE: | |
| for temp_pdf in temp_created: | |
| temp_pdf.unlink(missing_ok=True) | |
| try: | |
| tmp_dir.rmdir() | |
| except OSError: | |
| pass | |
| @dataclass | |
| class PreparedDocument: | |
| original: Path | |
| upload_parts: list[Path] | |
| class GoogleTranslateRetryableError(RuntimeError): | |
| pass | |
| class GoogleTranslateNoContentError(RuntimeError): | |
| pass | |
| class GoogleTranslateSkipError(RuntimeError): | |
| pass | |
| class ChromeTranslateBot: | |
| def __init__(self, download_dir: Path): | |
| self.download_dir = download_dir | |
| self.driver = None | |
| self.wait = None | |
| def _wait_cdp(self, timeout: int = 60) -> bool: | |
| import urllib.request | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| try: | |
| with urllib.request.urlopen(f"http://127.0.0.1:{DEBUG_PORT}/json/version", timeout=3) as resp: | |
| resp.read(2000) | |
| return True | |
| except Exception: | |
| time.sleep(1) | |
| return False | |
| def _start_chrome_debug(self) -> None: | |
| if not Path(CHROME_PATH).exists(): | |
| raise FileNotFoundError(f"Nu gasesc Chrome: {CHROME_PATH}") | |
| if not START_CHROME_PS1.exists(): | |
| raise FileNotFoundError(f"Lipseste scriptul PowerShell: {START_CHROME_PS1}") | |
| logger.info("Pornesc Chrome debug pe profilul: %s", CHROME_PROFILE_DIR) | |
| result = subprocess.run( | |
| [ | |
| "powershell.exe", | |
| "-NoProfile", | |
| "-NonInteractive", | |
| "-ExecutionPolicy", | |
| "Bypass", | |
| "-File", | |
| str(START_CHROME_PS1), | |
| "-ChromePath", | |
| CHROME_PATH, | |
| "-ProfileDir", | |
| CHROME_PROFILE_DIR, | |
| "-DebugPort", | |
| str(DEBUG_PORT), | |
| "-Url", | |
| TRANSLATE_URL, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=90, | |
| ) | |
| if result.stdout: | |
| logger.info("PowerShell Start-ChromeDebug stdout:\n%s", result.stdout.strip()) | |
| if result.stderr: | |
| logger.warning("PowerShell Start-ChromeDebug stderr:\n%s", result.stderr.strip()) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"Start-ChromeDebug.ps1 a esuat cu cod {result.returncode}") | |
| def start(self) -> None: | |
| if not self._wait_cdp(timeout=3): | |
| self._start_chrome_debug() | |
| if not self._wait_cdp(timeout=90): | |
| raise RuntimeError(f"Chrome debug nu raspunde pe portul {DEBUG_PORT}") | |
| options = ChromeOptions() | |
| options.add_argument("--remote-allow-origins=*") | |
| options.add_experimental_option("debuggerAddress", f"127.0.0.1:{DEBUG_PORT}") | |
| service = ChromeService() | |
| self.driver = webdriver.Chrome(service=service, options=options) | |
| self.wait = WebDriverWait(self.driver, 45) | |
| self.driver.set_page_load_timeout(90) | |
| self.driver.set_script_timeout(90) | |
| self.download_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| self.driver.execute_cdp_cmd( | |
| "Page.setDownloadBehavior", | |
| {"behavior": "allow", "downloadPath": str(self.download_dir)}, | |
| ) | |
| except Exception as exc: | |
| logger.warning("Nu pot seta download dir prin Page.setDownloadBehavior: %s", exc) | |
| try: | |
| self.driver.execute_cdp_cmd( | |
| "Browser.setDownloadBehavior", | |
| {"behavior": "allow", "downloadPath": str(self.download_dir)}, | |
| ) | |
| except Exception as exc: | |
| logger.warning("Nu pot seta download dir prin Browser.setDownloadBehavior: %s", exc) | |
| self._open_translate_single_tab() | |
| def _restart_driver_session(self) -> None: | |
| logger.warning("Reconectez ChromeDriver la Chrome debug dupa pierderea tabului activ.") | |
| if self.driver is not None: | |
| try: | |
| self.driver.quit() | |
| except Exception: | |
| pass | |
| self.driver = None | |
| self.wait = None | |
| time.sleep(2) | |
| self.start() | |
| def _open_translate_single_tab(self) -> None: | |
| assert self.driver is not None | |
| try: | |
| handles = list(self.driver.window_handles) | |
| except WebDriverException as exc: | |
| if is_lost_browser_window(exc): | |
| raise | |
| raise | |
| if not handles: | |
| self.driver.switch_to.new_window("tab") | |
| handles = list(self.driver.window_handles) | |
| keep_handle = None | |
| for handle in handles: | |
| try: | |
| self.driver.switch_to.window(handle) | |
| if "translate.google" in (self.driver.current_url or ""): | |
| keep_handle = handle | |
| break | |
| except WebDriverException: | |
| continue | |
| if keep_handle is None and handles: | |
| keep_handle = handles[0] | |
| for handle in list(handles): | |
| if handle == keep_handle: | |
| continue | |
| try: | |
| self.driver.switch_to.window(handle) | |
| self.driver.close() | |
| logger.info("Am inchis un tab Chrome extra.") | |
| except WebDriverException: | |
| pass | |
| if keep_handle: | |
| self.driver.switch_to.window(keep_handle) | |
| self.driver.get(TRANSLATE_URL) | |
| def close(self) -> None: | |
| # Nu inchidem Chrome: descarcarile sau taburile pot ramane utile pentru inspectie. | |
| pass | |
| def translate_file(self, upload_path: Path) -> Path: | |
| if self.driver is None or self.wait is None: | |
| raise RuntimeError("ChromeTranslateBot nu este pornit") | |
| last_error: Exception | None = None | |
| max_attempts = max(TRANSLATE_ERROR_RETRIES, DOWNLOAD_ERROR_RETRIES) + 1 | |
| for attempt in range(1, max_attempts + 1): | |
| try: | |
| return self._translate_file_once(upload_path, attempt) | |
| except GoogleTranslateRetryableError as exc: | |
| last_error = exc | |
| logger.warning( | |
| "Google Translate a refuzat temporar fisierul; il sar pentru aceasta rulare: %s", | |
| exc, | |
| ) | |
| self._open_translate_single_tab() | |
| raise GoogleTranslateSkipError(str(exc)) from exc | |
| except TimeoutException as exc: | |
| last_error = exc | |
| existing = find_existing_translation_for_part(upload_path) | |
| if existing and existing.exists(): | |
| logger.info( | |
| "Download gasit dupa timeout, folosesc fisierul existent: %s", | |
| existing, | |
| ) | |
| return existing | |
| if is_download_timeout(exc) and attempt <= DOWNLOAD_ERROR_RETRIES: | |
| logger.warning( | |
| "Download-ul nu a aparut la timp pentru %s (%s/%s). Reincerc aceeasi parte.", | |
| upload_path.name, | |
| attempt, | |
| DOWNLOAD_ERROR_RETRIES + 1, | |
| ) | |
| self._open_translate_single_tab() | |
| time.sleep(15) | |
| continue | |
| raise | |
| except WebDriverException as exc: | |
| last_error = exc | |
| if is_lost_browser_window(exc) and attempt < max_attempts: | |
| logger.warning( | |
| "Tabul Chrome activ a disparut in timpul traducerii pentru %s (%s/%s): %s", | |
| upload_path.name, | |
| attempt, | |
| max_attempts, | |
| exc, | |
| ) | |
| self._restart_driver_session() | |
| time.sleep(5) | |
| continue | |
| raise | |
| raise RuntimeError(f"Nu am putut traduce dupa retry: {upload_path}") from last_error | |
| def _translate_file_once(self, upload_path: Path, attempt: int) -> Path: | |
| assert self.driver is not None and self.wait is not None | |
| logger.info("Traduc: %s (incercarea %s)", upload_path, attempt) | |
| self._open_translate_single_tab() | |
| self._dismiss_popups() | |
| input_el = self.wait.until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']")) | |
| ) | |
| input_el.send_keys(str(upload_path.resolve())) | |
| self._click_button(["Traduceti", "Translate", "Traducir"], timeout=90) | |
| logger.info("Traducere pornita; astept %s secunde inainte de download.", TRANSLATE_WAIT_SEC) | |
| self._wait_translate_delay_or_error(TRANSLATE_WAIT_SEC) | |
| before = self._download_snapshot() | |
| self._click_button( | |
| ["Descarcati traducerea", "Download translation", "Download"], | |
| timeout=180, | |
| ) | |
| downloaded = self._wait_for_download(before, timeout=DOWNLOAD_WAIT_SEC) | |
| logger.info("Descarcat: %s", downloaded) | |
| return downloaded | |
| def _dismiss_popups(self) -> None: | |
| for texts in [ | |
| ["Accepta tot", "Accept all", "Sunt de acord", "I agree"], | |
| ["Acceptă tot", "Accept all", "Sunt de acord", "I agree"], | |
| ["Nu acum", "Not now"], | |
| ]: | |
| try: | |
| self._click_button(texts, timeout=3, required=False) | |
| except Exception: | |
| pass | |
| def _click_button(self, texts: list[str], timeout: int, required: bool = True) -> bool: | |
| assert self.driver is not None | |
| deadline = time.time() + timeout | |
| wanted = texts | |
| while time.time() < deadline: | |
| self._raise_translate_error_if_present() | |
| clicked = self.driver.execute_script( | |
| """ | |
| const normalize = (value) => (value || '') | |
| .normalize('NFD') | |
| .replace(/[\\u0300-\\u036f]/g, '') | |
| .toLowerCase(); | |
| const wanted = arguments[0].map(normalize); | |
| const nodes = Array.from(document.querySelectorAll('button, [role="button"]')); | |
| for (const el of nodes) { | |
| const text = (el.innerText || el.textContent || '').trim(); | |
| const textNorm = normalize(text); | |
| const disabled = el.disabled || el.getAttribute('aria-disabled') === 'true'; | |
| if (!disabled && wanted.some(w => textNorm.includes(w))) { | |
| el.scrollIntoView({block: 'center', inline: 'center'}); | |
| el.click(); | |
| return text; | |
| } | |
| } | |
| return null; | |
| """, | |
| wanted, | |
| ) | |
| if clicked: | |
| logger.info("Click buton: %s", clicked) | |
| return True | |
| time.sleep(1) | |
| if required: | |
| raise TimeoutException(f"Nu am gasit butonul: {texts}") | |
| return False | |
| def _wait_translate_delay_or_error(self, seconds: int) -> None: | |
| deadline = time.time() + seconds | |
| while time.time() < deadline: | |
| self._raise_translate_error_if_present() | |
| time.sleep(2) | |
| def _raise_translate_error_if_present(self) -> None: | |
| error = self._translate_error() | |
| if not error: | |
| return | |
| kind, text = error | |
| if kind == "no_content": | |
| raise GoogleTranslateNoContentError(text) | |
| raise GoogleTranslateRetryableError(text) | |
| def _translate_error(self) -> tuple[str, str] | None: | |
| assert self.driver is not None | |
| try: | |
| text = self.driver.execute_script( | |
| "return document.body ? (document.body.innerText || document.body.textContent || '') : '';" | |
| ) | |
| except WebDriverException: | |
| return None | |
| page_text = str(text) | |
| normalized = normalize_for_match(page_text) | |
| compact = re.sub(r"[^a-z0-9]+", " ", normalized).strip() | |
| def has_any(needles: list[str]) -> bool: | |
| for needle in needles: | |
| needle_norm = normalize_for_match(needle) | |
| needle_compact = re.sub(r"[^a-z0-9]+", " ", needle_norm).strip() | |
| if needle_norm in normalized or needle_compact in compact: | |
| return True | |
| return False | |
| def first_matching_line(needles: list[str], fallback: str) -> str: | |
| for line in page_text.splitlines(): | |
| line_norm = normalize_for_match(line) | |
| line_compact = re.sub(r"[^a-z0-9]+", " ", line_norm).strip() | |
| for needle in needles: | |
| needle_norm = normalize_for_match(needle) | |
| needle_compact = re.sub(r"[^a-z0-9]+", " ", needle_norm).strip() | |
| if needle_norm in line_norm or needle_compact in line_compact: | |
| return line.strip() | |
| return fallback | |
| no_content_needles = [ | |
| "nu s-a detectat continut care poate fi tradus", | |
| "nu s a detectat continut care poate fi tradus", | |
| "no translatable content detected", | |
| "no translatable content was detected", | |
| "no content that can be translated was detected", | |
| "no content could be translated", | |
| ] | |
| if has_any(no_content_needles): | |
| return ( | |
| "no_content", | |
| first_matching_line( | |
| no_content_needles, | |
| "Nu s-a detectat continut care poate fi tradus.", | |
| ), | |
| ) | |
| error_needles = [ | |
| "momentan fisierul nu poate fi tradus", | |
| "fisierul nu poate fi tradus", | |
| "incercati din nou peste cateva minute", | |
| "currently the file cannot be translated", | |
| "this file cannot be translated", | |
| "try again in a few minutes", | |
| "the file could not be translated", | |
| ] | |
| if has_any(error_needles): | |
| return ( | |
| "retryable", | |
| first_matching_line( | |
| error_needles, | |
| "Momentan, fisierul nu poate fi tradus. Incercati din nou peste cateva minute.", | |
| ), | |
| ) | |
| return None | |
| def _download_dirs(self) -> list[Path]: | |
| dirs = [self.download_dir] | |
| try: | |
| if WINDOWS_DOWNLOADS_DIR.exists() and WINDOWS_DOWNLOADS_DIR.resolve() != self.download_dir.resolve(): | |
| dirs.append(WINDOWS_DOWNLOADS_DIR) | |
| except OSError: | |
| if WINDOWS_DOWNLOADS_DIR.exists() and WINDOWS_DOWNLOADS_DIR != self.download_dir: | |
| dirs.append(WINDOWS_DOWNLOADS_DIR) | |
| return dirs | |
| def _download_key(self, path: Path) -> str: | |
| try: | |
| return str(path.resolve()).lower() | |
| except OSError: | |
| return str(path).lower() | |
| def _download_snapshot(self) -> set[str]: | |
| return { | |
| self._download_key(p) | |
| for directory in self._download_dirs() | |
| for p in directory.glob("*") | |
| if p.is_file() | |
| } | |
| def _normalize_download_location(self, path: Path) -> Path: | |
| try: | |
| if path.parent.resolve() == self.download_dir.resolve(): | |
| return path | |
| except OSError: | |
| if path.parent == self.download_dir: | |
| return path | |
| self.download_dir.mkdir(parents=True, exist_ok=True) | |
| dest = unique_destination(self.download_dir, path.name) | |
| shutil.move(str(path), str(dest)) | |
| logger.info("Mut download gasit in Downloads Windows: %s -> %s", path, dest) | |
| return dest | |
| def _wait_for_download(self, before: set[str], timeout: int) -> Path: | |
| start_time = time.time() | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| candidates = [ | |
| p for directory in self._download_dirs() | |
| for p in directory.glob("*") | |
| if p.is_file() | |
| and self._download_key(p) not in before | |
| and p.suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| ] | |
| if candidates: | |
| newest = max(candidates, key=lambda p: p.stat().st_mtime) | |
| if not (newest.parent / f"{newest.name}.crdownload").exists(): | |
| return self._normalize_download_location(newest) | |
| time.sleep(2) | |
| current_docs = [ | |
| str(p) for directory in self._download_dirs() | |
| for p in directory.glob("*") | |
| if p.is_file() and p.suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| ] | |
| partials = [ | |
| str(p) for directory in self._download_dirs() | |
| for p in directory.glob("*") | |
| if p.is_file() and p.stat().st_mtime >= start_time and p.name.lower().endswith((".crdownload", ".tmp")) | |
| ] | |
| logger.warning( | |
| "Timeout download. Directoare=%s | documente=%s | partiale recente=%s", | |
| self._download_dirs(), | |
| current_docs[-10:], | |
| partials[-10:], | |
| ) | |
| raise TimeoutException("Download-ul traducerii nu a aparut la timp") | |
| def prepare_document(word: WordManager, path: Path, on_split_part_saved=None) -> PreparedDocument: | |
| docx_path = word.convert_to_docx(path) | |
| parts = word.split_docx_if_needed(docx_path, path, on_part_saved=on_split_part_saved) | |
| return PreparedDocument(original=path, upload_parts=parts) | |
| def cleanup_document_intermediates(prepared: PreparedDocument, downloads: Iterable[Path]) -> None: | |
| if KEEP_INTERMEDIATE: | |
| return | |
| for p in prepared.upload_parts: | |
| try: | |
| if PROJECT_DIR in p.resolve().parents: | |
| p.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| def matching_downloads_for_part(part: Path) -> list[Path]: | |
| matches: list[Path] = [] | |
| expected_stem = part.stem | |
| roots = [DOWNLOADS_DIR] | |
| if WINDOWS_DOWNLOADS_DIR.exists(): | |
| roots.append(WINDOWS_DOWNLOADS_DIR) | |
| for root in roots: | |
| if not root.exists(): | |
| continue | |
| for ext in TRANSLATED_DOC_EXTENSIONS: | |
| for candidate in root.rglob(f"*{ext}"): | |
| if ( | |
| candidate.is_file() | |
| and candidate.suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| and ( | |
| candidate.stem == expected_stem | |
| or re.fullmatch(re.escape(expected_stem) + r" \(\d+\)", candidate.stem) | |
| ) | |
| ): | |
| matches.append(candidate) | |
| return matches | |
| def cleanup_completed_download_parts(prepared: PreparedDocument, downloads: Iterable[Path], final_pdf: Path) -> None: | |
| if KEEP_INTERMEDIATE or not final_pdf.exists(): | |
| return | |
| to_delete: dict[str, Path] = {} | |
| for part in prepared.upload_parts: | |
| for match in matching_downloads_for_part(part): | |
| try: | |
| resolved = match.resolve() | |
| except OSError: | |
| resolved = match | |
| to_delete[str(resolved).lower()] = match | |
| for downloaded in downloads: | |
| try: | |
| resolved = downloaded.resolve() | |
| except OSError: | |
| resolved = downloaded | |
| if ( | |
| downloaded.exists() | |
| and downloaded.suffix.lower() in TRANSLATED_DOC_EXTENSIONS | |
| and ( | |
| DOWNLOADS_DIR in resolved.parents | |
| or (WINDOWS_DOWNLOADS_DIR.exists() and WINDOWS_DOWNLOADS_DIR in resolved.parents) | |
| ) | |
| ): | |
| to_delete[str(resolved).lower()] = downloaded | |
| deleted = 0 | |
| for path in to_delete.values(): | |
| try: | |
| path.unlink(missing_ok=True) | |
| deleted += 1 | |
| except Exception as exc: | |
| logger.warning("Nu pot sterge partea descarcata dupa PDF final: %s | %s", path, exc) | |
| if deleted: | |
| logger.info("Sterse parti descarcate dupa PDF final (%s fisiere).", deleted) | |
| def count_files(directory: Path, pattern: str) -> int: | |
| if not directory.exists(): | |
| return 0 | |
| return sum(1 for p in directory.glob(pattern) if p.is_file()) | |
| def count_source_docs(directory: Path) -> int: | |
| if not directory.exists(): | |
| return 0 | |
| return sum( | |
| 1 | |
| for p in directory.glob("*") | |
| if p.is_file() | |
| and p.suffix.lower() in {".doc", ".docx"} | |
| and not p.name.startswith("~$") | |
| ) | |
| def summarize_problem_entry(entry: dict) -> tuple[str, str, int, int, str, str]: | |
| parts = [p for p in entry.get("parts", []) if p] | |
| translated_parts = [p for p in entry.get("translated_parts", []) if p] | |
| detail = ( | |
| entry.get("error") | |
| or entry.get("skip_detail") | |
| or entry.get("skip_reason") | |
| or "fara detalii in state" | |
| ) | |
| detail = " ".join(str(detail).split()) | |
| return ( | |
| str(entry.get("original", "")), | |
| str(entry.get("status", "necunoscut")), | |
| len(translated_parts), | |
| len(parts), | |
| str(entry.get("failed_part_index", "")), | |
| detail[:500], | |
| ) | |
| def log_final_report(state: dict, completed_this_run: int) -> None: | |
| source_count = count_source_docs(ARCHIVE_PATH) | |
| completed_source_count = count_source_docs(COMPLETED_SOURCE_DIR) | |
| final_pdf_count = count_files(FINAL_DIR, "*.pdf") | |
| converted_pdf_count = count_files(CONVERTED_PDF_DIR, "*.pdf") | |
| registry_count = sum( | |
| 1 for entry in load_completed_registry().get("documents", {}).values() | |
| if registry_entry_pdf_exists(entry) | |
| ) | |
| logger.info("===== RAPORT FINAL =====") | |
| logger.info("Finalizate in aceasta rulare: %s", completed_this_run) | |
| logger.info("PDF-uri convertite ABBYY: %s | %s", converted_pdf_count, CONVERTED_PDF_DIR) | |
| logger.info("DOCX ramase la tradus: %s | %s", source_count, ARCHIVE_PATH) | |
| logger.info("DOCX in GATA FINALIZAT: %s | %s", completed_source_count, COMPLETED_SOURCE_DIR) | |
| logger.info("PDF-uri finale traduse: %s | %s", final_pdf_count, FINAL_DIR) | |
| logger.info("DOCX finalizate in registru: %s | %s", registry_count, COMPLETED_REGISTRY_FILE) | |
| problems = [] | |
| for entry in state.get("documents", {}).values(): | |
| status = entry.get("status") | |
| if status == "done": | |
| continue | |
| original = Path(entry.get("original", "")) if entry.get("original") else None | |
| if original and source_is_in_completed_dir(original): | |
| continue | |
| paths_to_check = [] | |
| if original: | |
| paths_to_check.append(original) | |
| paths_to_check.extend(Path(p) for p in entry.get("parts", []) if p) | |
| paths_to_check.extend(Path(p) for p in entry.get("translated_parts", []) if p) | |
| if paths_to_check and not any(p.exists() for p in paths_to_check): | |
| continue | |
| problems.append(summarize_problem_entry(entry)) | |
| if not problems: | |
| logger.info("Niciun fisier problematic in state.") | |
| return | |
| logger.warning("Fisiere nefinalizate / sarite: %s", len(problems)) | |
| for original, status, translated_count, parts_count, failed_part, detail in sorted(problems): | |
| logger.warning( | |
| "NEFINALIZAT | status=%s | traduse=%s/%s | partea_esec=%s | fisier=%s | cauza=%s", | |
| status, | |
| translated_count, | |
| parts_count, | |
| failed_part or "-", | |
| original, | |
| detail, | |
| ) | |
| def process_documents(args: argparse.Namespace) -> int: | |
| ensure_dirs() | |
| state = load_state() | |
| rebuild_completed_registry_from_state(state) | |
| move_completed_source_documents(ARCHIVE_PATH, [FINAL_DIR, CONVERTED_PDF_DIR], state) | |
| docs = scan_documents(ARCHIVE_PATH) | |
| known_docs = {doc_id(p): p for p in docs} | |
| resume_docs: list[Path] = [] | |
| for key, entry in state.get("documents", {}).items(): | |
| if entry.get("status") in {"done", "skipped"} and not args.force: | |
| state_final = Path(entry.get("final_pdf", "")) if entry.get("final_pdf") else None | |
| original_text = entry.get("original") | |
| original_path = Path(original_text) if original_text else None | |
| if entry.get("status") == "skipped" or (state_final and state_final.exists()) or not (original_path and original_path.exists()): | |
| continue | |
| if key in known_docs: | |
| continue | |
| original_text = entry.get("original") | |
| parts = [Path(p) for p in entry.get("parts", []) if p] | |
| translated = [Path(p) for p in entry.get("translated_parts", []) if p] | |
| if original_text and ( | |
| any(p.exists() for p in parts + translated) | |
| or has_existing_translation(parts) | |
| ): | |
| resume_docs.append(Path(original_text)) | |
| if resume_docs: | |
| logger.info( | |
| "Resume: adaug %s documente din state, chiar daca nu mai sunt in sursa.", | |
| len(resume_docs), | |
| ) | |
| docs = resume_docs + docs | |
| if args.only_name: | |
| needle = args.only_name.lower() | |
| docs = [p for p in docs if needle in p.name.lower()] | |
| if args.max_files: | |
| docs = docs[: args.max_files] | |
| logger.info("Documente gasite: %s", len(docs)) | |
| if not docs: | |
| return 0 | |
| run_download_dir = DOWNLOADS_DIR / datetime.now().strftime("%Y%m%d_%H%M%S") | |
| bot = None | |
| completed = 0 | |
| with WordManager() as word: | |
| for index, original in enumerate(docs, 1): | |
| key = doc_id(original) | |
| existing = state["documents"].get(key, {}) | |
| final_pdf = FINAL_DIR / f"{safe_name(original.stem)}_FINALIZAT.pdf" | |
| state_final_pdf = Path(existing.get("final_pdf", "")) if existing.get("final_pdf") else None | |
| if ( | |
| existing.get("status") == "done" | |
| and not args.force | |
| and (final_pdf.exists() or (state_final_pdf and state_final_pdf.exists())) | |
| ): | |
| logger.info("[%s/%s] Skip deja finalizat: %s", index, len(docs), original) | |
| continue | |
| if is_completed_in_registry(original) and not args.force: | |
| logger.info("[%s/%s] Skip deja finalizat in registru: %s", index, len(docs), original) | |
| continue | |
| if original.exists() and is_same_skipped_source(existing, original) and not args.force: | |
| logger.info( | |
| "[%s/%s] Skip deja marcat: %s | motiv=%s", | |
| index, | |
| len(docs), | |
| original, | |
| existing.get("skip_reason", "necunoscut"), | |
| ) | |
| continue | |
| if original.exists(): | |
| source_size = original.stat().st_size | |
| if source_size < MIN_SOURCE_BYTES: | |
| detail = f"Fisier prea mic: {source_size} bytes, sub limita {MIN_SOURCE_BYTES} bytes" | |
| logger.warning("[%s/%s] Skip fisier incomplet/prea mic: %s | %s", index, len(docs), original, detail) | |
| mark_document_skipped(state, key, original, "too_small", detail) | |
| continue | |
| logger.info("[%s/%s] Pregatesc: %s (%.2f MB)", index, len(docs), original, file_mb(original)) | |
| def checkpoint_split(saved_parts, done_index, total_parts, start_page, end_page, out_path): | |
| state["documents"][key] = { | |
| "original": str(original), | |
| "status": "splitting", | |
| "parts": [str(p) for p in saved_parts], | |
| "translated_parts": existing.get("translated_parts", []), | |
| "split_progress": { | |
| "done": done_index, | |
| "total": total_parts, | |
| "last_part": str(out_path), | |
| "last_pages": [start_page, end_page], | |
| "updated_at": now_iso(), | |
| }, | |
| "updated_at": now_iso(), | |
| } | |
| save_state(state) | |
| try: | |
| prepared = prepare_document(word, original, on_split_part_saved=checkpoint_split) | |
| except Exception as exc: | |
| if is_rpc_unavailable(exc): | |
| logger.warning( | |
| "[%s/%s] Word COM/RPC a cazut la pregatire. Repornesc Word si reincerc: %s", | |
| index, | |
| len(docs), | |
| original, | |
| ) | |
| word.restart() | |
| try: | |
| prepared = prepare_document(word, original, on_split_part_saved=checkpoint_split) | |
| except Exception as retry_exc: | |
| if is_word_corrupt_or_unreadable(retry_exc): | |
| detail = str(retry_exc) | |
| logger.warning( | |
| "[%s/%s] Skip fisier corupt/necitibil dupa retry: %s | %s", | |
| index, | |
| len(docs), | |
| original, | |
| detail, | |
| ) | |
| mark_document_skipped(state, key, original, "corrupt_or_unreadable", detail) | |
| continue | |
| if isinstance(retry_exc, DocumentSplitError): | |
| detail = str(retry_exc) | |
| logger.warning( | |
| "[%s/%s] Skip fisier imposibil de splitat dupa retry: %s | %s", | |
| index, | |
| len(docs), | |
| original, | |
| detail, | |
| ) | |
| mark_document_skipped(state, key, original, "split_failed", detail) | |
| continue | |
| raise | |
| elif is_word_corrupt_or_unreadable(exc): | |
| detail = str(exc) | |
| logger.warning( | |
| "[%s/%s] Skip fisier corupt/necitibil: %s | %s", | |
| index, | |
| len(docs), | |
| original, | |
| detail, | |
| ) | |
| mark_document_skipped(state, key, original, "corrupt_or_unreadable", detail) | |
| continue | |
| elif isinstance(exc, DocumentSplitError): | |
| detail = str(exc) | |
| logger.warning( | |
| "[%s/%s] Skip fisier imposibil de splitat sub limite: %s | %s", | |
| index, | |
| len(docs), | |
| original, | |
| detail, | |
| ) | |
| mark_document_skipped(state, key, original, "split_failed", detail) | |
| continue | |
| else: | |
| raise | |
| else: | |
| state_parts = [Path(p) for p in existing.get("parts", []) if p] | |
| resume_parts = [p for p in state_parts if p.exists()] | |
| if not resume_parts and args.finalize_existing and has_existing_translation(state_parts): | |
| resume_parts = state_parts | |
| if not resume_parts: | |
| logger.warning( | |
| "[%s/%s] Nu gasesc originalul si nici partile pregatite, sar: %s", | |
| index, | |
| len(docs), | |
| original, | |
| ) | |
| continue | |
| logger.info( | |
| "[%s/%s] Reiau din state fara original in sursa: %s | parti=%s", | |
| index, | |
| len(docs), | |
| original, | |
| len(resume_parts), | |
| ) | |
| prepared = PreparedDocument(original=original, upload_parts=resume_parts) | |
| logger.info("Parti upload: %s", len(prepared.upload_parts)) | |
| existing_part_paths = [Path(p) for p in existing.get("parts", []) if p] | |
| parts_changed = ( | |
| bool(existing_part_paths) | |
| and [str(p) for p in existing_part_paths] != [str(p) for p in prepared.upload_parts] | |
| ) | |
| resume_existing = dict(existing) | |
| allow_disk_resume = True | |
| if parts_changed: | |
| logger.info( | |
| "Schema de split s-a schimbat pentru %s; ignor traducerile vechi si refac partile.", | |
| original.name, | |
| ) | |
| resume_existing["translated_parts"] = [] | |
| allow_disk_resume = False | |
| state["documents"][key] = { | |
| "original": str(original), | |
| "status": "prepared" if args.prepare_only else existing.get("status", "running"), | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": resume_existing.get("translated_parts", []), | |
| "updated_at": now_iso(), | |
| } | |
| save_state(state) | |
| if args.prepare_only: | |
| for p in prepared.upload_parts: | |
| logger.info(" parte: %s (%.2f MB)", p, file_mb(p)) | |
| continue | |
| translated_slots = collect_translated_parts_from_state_or_disk( | |
| prepared.upload_parts, | |
| resume_existing, | |
| allow_disk_lookup=allow_disk_resume, | |
| ) | |
| translated_parts: list[Path | None] = list(translated_slots) | |
| no_content_parts = list(resume_existing.get("no_content_parts", [])) | |
| found_count = sum(1 for p in translated_parts if p and p.exists()) | |
| if found_count: | |
| logger.info( | |
| "Resume: am gasit deja %s/%s parti traduse pentru %s", | |
| found_count, | |
| len(prepared.upload_parts), | |
| original.name, | |
| ) | |
| if args.finalize_existing and found_count < len(prepared.upload_parts): | |
| logger.info( | |
| "Finalize-only: nu am toate partile traduse pentru %s (%s/%s), sar.", | |
| original.name, | |
| found_count, | |
| len(prepared.upload_parts), | |
| ) | |
| continue | |
| google_skip_detail: str | None = None | |
| for part_index, part in enumerate(prepared.upload_parts, 1): | |
| existing_download = translated_parts[part_index - 1] | |
| if existing_download and existing_download.exists(): | |
| logger.info( | |
| "Skip upload parte %s/%s, traducere existenta: %s", | |
| part_index, | |
| len(prepared.upload_parts), | |
| existing_download, | |
| ) | |
| continue | |
| if args.finalize_existing: | |
| continue | |
| if bot is None: | |
| bot = ChromeTranslateBot(run_download_dir) | |
| bot.start() | |
| logger.info("Upload parte %s/%s: %s", part_index, len(prepared.upload_parts), part.name) | |
| try: | |
| downloaded = bot.translate_file(part) | |
| except GoogleTranslateSkipError as exc: | |
| google_skip_detail = f"Parte {part_index}/{len(prepared.upload_parts)} ({part.name}): {exc}" | |
| logger.warning( | |
| "[%s/%s] Skip Google Translate temporar: %s | %s", | |
| index, | |
| len(docs), | |
| original, | |
| google_skip_detail, | |
| ) | |
| mark_document_temp_skipped( | |
| state, | |
| key, | |
| original, | |
| "google_translate_temporary_refusal", | |
| google_skip_detail, | |
| prepared.upload_parts, | |
| translated_parts, | |
| ) | |
| break | |
| except GoogleTranslateNoContentError as exc: | |
| logger.warning( | |
| "Google nu a detectat continut traductibil pentru %s, partea %s/%s. " | |
| "Folosesc partea originala ca passthrough si continui: %s", | |
| original.name, | |
| part_index, | |
| len(prepared.upload_parts), | |
| exc, | |
| ) | |
| translated_parts[part_index - 1] = part | |
| no_content_parts.append(str(part)) | |
| state_entry = { | |
| "original": str(original), | |
| "status": "running", | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": [str(p) for p in translated_parts if p], | |
| "no_content_parts": no_content_parts, | |
| "updated_at": now_iso(), | |
| } | |
| state["documents"][key] = state_entry | |
| save_state(state) | |
| if part_index < len(prepared.upload_parts): | |
| logger.info("Pauza intre parti: %s secunde", BETWEEN_PARTS_SEC) | |
| time.sleep(BETWEEN_PARTS_SEC) | |
| continue | |
| except (TimeoutException, GoogleTranslateRetryableError) as exc: | |
| fallback_ok = False | |
| if part.suffix.lower() in {".doc", ".docx"}: | |
| try: | |
| fallback_part = word.export_upload_part_to_pdf(part) | |
| if fallback_part.stat().st_size > MAX_UPLOAD_BYTES: | |
| raise DocumentSplitError( | |
| f"Fallback PDF peste limita: {fallback_part.name} ({file_mb(fallback_part):.2f} MB)" | |
| ) | |
| logger.warning( | |
| "Download DOCX esuat pentru %s; reincerc aceeasi parte ca PDF: %s", | |
| part.name, | |
| fallback_part.name, | |
| ) | |
| downloaded = bot.translate_file(fallback_part) | |
| translated_parts[part_index - 1] = downloaded | |
| state_entry = { | |
| "original": str(original), | |
| "status": "running", | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": [str(p) for p in translated_parts if p], | |
| "fallback_pdf_parts": { | |
| **dict(resume_existing.get("fallback_pdf_parts", {})), | |
| str(part_index): str(fallback_part), | |
| }, | |
| "updated_at": now_iso(), | |
| } | |
| if no_content_parts: | |
| state_entry["no_content_parts"] = no_content_parts | |
| state["documents"][key] = state_entry | |
| save_state(state) | |
| fallback_ok = True | |
| except Exception as fallback_exc: | |
| logger.warning( | |
| "Fallback PDF a esuat pentru %s, partea %s/%s: %s", | |
| original.name, | |
| part_index, | |
| len(prepared.upload_parts), | |
| fallback_exc, | |
| ) | |
| if fallback_ok: | |
| if part_index < len(prepared.upload_parts): | |
| logger.info("Pauza intre parti: %s secunde", BETWEEN_PARTS_SEC) | |
| time.sleep(BETWEEN_PARTS_SEC) | |
| continue | |
| logger.error( | |
| "Nu am putut finaliza upload/download pentru %s, partea %s/%s. Marchez pentru reluare: %s", | |
| original.name, | |
| part_index, | |
| len(prepared.upload_parts), | |
| exc, | |
| ) | |
| state["documents"][key] = { | |
| "original": str(original), | |
| "status": "download_failed", | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": [str(p) for p in translated_parts if p], | |
| "failed_part_index": part_index, | |
| "failed_part": str(part), | |
| "error": str(exc)[:1000], | |
| "updated_at": now_iso(), | |
| } | |
| save_state(state) | |
| break | |
| translated_parts[part_index - 1] = downloaded | |
| state_entry = { | |
| "original": str(original), | |
| "status": "running", | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": [str(p) for p in translated_parts if p], | |
| "updated_at": now_iso(), | |
| } | |
| if no_content_parts: | |
| state_entry["no_content_parts"] = no_content_parts | |
| state["documents"][key] = state_entry | |
| save_state(state) | |
| if part_index < len(prepared.upload_parts): | |
| logger.info("Pauza intre parti: %s secunde", BETWEEN_PARTS_SEC) | |
| time.sleep(BETWEEN_PARTS_SEC) | |
| if google_skip_detail: | |
| continue | |
| ready_parts = [p for p in translated_parts if p and p.exists()] | |
| if len(ready_parts) < len(prepared.upload_parts): | |
| logger.warning( | |
| "Nu pot face merge pentru %s: am %s/%s parti traduse.", | |
| original.name, | |
| len(ready_parts), | |
| len(prepared.upload_parts), | |
| ) | |
| continue | |
| pdf_path = word.export_translated_parts_to_pdf(ready_parts, original) | |
| logger.info("PDF final: %s", pdf_path) | |
| state_entry = { | |
| "original": str(original), | |
| "status": "done", | |
| "parts": [str(p) for p in prepared.upload_parts], | |
| "translated_parts": [str(p) for p in ready_parts], | |
| "final_pdf": str(pdf_path), | |
| "updated_at": now_iso(), | |
| } | |
| if no_content_parts: | |
| state_entry["no_content_parts"] = no_content_parts | |
| if original.exists() and not source_is_in_completed_dir(original): | |
| COMPLETED_SOURCE_DIR.mkdir(parents=True, exist_ok=True) | |
| dest = unique_destination(COMPLETED_SOURCE_DIR, original.name) | |
| shutil.move(str(original), str(dest)) | |
| state_entry["source_moved_to"] = str(dest) | |
| logger.info("Mutat original finalizat: %s -> %s", original, dest) | |
| state["documents"][key] = state_entry | |
| update_completed_registry_from_entry(state_entry) | |
| save_state(state) | |
| cleanup_completed_download_parts(prepared, ready_parts, pdf_path) | |
| cleanup_document_intermediates(prepared, ready_parts) | |
| completed += 1 | |
| log_final_report(state, completed) | |
| logger.info("Gata. Documente finalizate in aceasta rulare: %s", completed) | |
| return 0 | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Google Translate Docs Chrome automation") | |
| parser.add_argument("--prepare-only", action="store_true", help="Doar converteste/spliteaza, fara browser/upload") | |
| parser.add_argument( | |
| "--finalize-existing", | |
| action="store_true", | |
| help="Doar face merge/PDF pentru traducerile deja descarcate, fara browser/upload", | |
| ) | |
| parser.add_argument("--max-files", type=int, default=int(os.environ.get("SIMPLU_GT_MAX_FILES", "0")) or None) | |
| parser.add_argument("--force", action="store_true", help="Reproceseaza chiar daca PDF-ul final exista") | |
| parser.add_argument("--only-name", help="Proceseaza doar fisierele care contin acest text in nume") | |
| parser.add_argument("--report-only", action="store_true", help="Afiseaza raportul curent fara procesare") | |
| parser.add_argument( | |
| "--sync-completed-only", | |
| action="store_true", | |
| help="Muta doar DOCX-urile care au PDF corespondent, apoi afiseaza raportul", | |
| ) | |
| return parser.parse_args() | |
| def main() -> int: | |
| args = parse_args() | |
| logger.info("ARCHIVE_PATH=%s", ARCHIVE_PATH) | |
| logger.info("MAX_UPLOAD_BYTES=%s", MAX_UPLOAD_BYTES) | |
| logger.info("MIN_SOURCE_BYTES=%s", MIN_SOURCE_BYTES) | |
| logger.info("MAX_PAGES_PER_PART=%s", MAX_PAGES_PER_PART) | |
| logger.info("TRANSLATE_WAIT_SEC=%s", TRANSLATE_WAIT_SEC) | |
| logger.info("BETWEEN_PARTS_SEC=%s", BETWEEN_PARTS_SEC) | |
| if args.report_only: | |
| ensure_dirs() | |
| state = load_state() | |
| rebuild_completed_registry_from_state(state) | |
| log_final_report(state, 0) | |
| return 0 | |
| if args.sync_completed_only: | |
| ensure_dirs() | |
| state = load_state() | |
| move_completed_source_documents(ARCHIVE_PATH, [FINAL_DIR, CONVERTED_PDF_DIR], state) | |
| log_final_report(state, 0) | |
| return 0 | |
| return process_documents(args) | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment