Last active
March 23, 2023 13:52
-
-
Save rgaudin/326a808965a6efbce56cdf70c4695dd1 to your computer and use it in GitHub Desktop.
Python implementation if zimrecreate that also allows updating metadata
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import base64 | |
import pathlib | |
import re | |
import sys | |
import traceback | |
from typing import List, Optional | |
try: | |
from libzim.reader import Archive | |
from libzim.writer import Creator, Item, ContentProvider, Blob | |
from libzim.writer import Hint | |
except ImportError as exc: | |
print("zimrecreate requires python-libzim") | |
print("Install it with `pip install libzim`") | |
print(str(exc)) | |
sys.exit(2) | |
__version__ = "1.0" | |
debug = False | |
illus_re = re.compile(r"^Illustration_(?P<size>\d+x\d+)@(?P<scale>\d)+$") | |
class ProxyContentProvider(ContentProvider): | |
def __init__(self, source): | |
super().__init__() | |
self.source = source | |
def get_size(self) -> int: | |
return self.source.size | |
def gen_blob(self) -> Blob: | |
yield Blob(bytes(self.source.content)) | |
yield Blob(b"") | |
class ProxyItem(Item): | |
def __init__(self, source, is_front: Optional[bool]): | |
super().__init__() | |
self.source = source | |
self.is_front = is_front | |
def get_path(self) -> str: | |
return self.source.path | |
def get_title(self) -> str: | |
return self.source.title | |
def get_mimetype(self) -> str: | |
return self.source.mimetype | |
def get_contentprovider(self): | |
return ProxyContentProvider(self.source) | |
def get_hints(self): | |
if self.is_front is None: | |
return {} | |
return {Hint.FRONT_ARTICLE: self.is_front} | |
def get_title_listing_v1_index(zim: Archive) -> int: | |
"""whether ZIM contains FRONT_ARTICLES (has titleListingv1)""" | |
for index in range(zim.all_entry_count - 1, zim.all_entry_count - 10, -1): | |
if not index: | |
break | |
if zim._get_entry_by_id(index).path == "listing/titleOrdered/v1": | |
return index | |
return -1 | |
def get_front_articles(zim: Archive) -> List[int]: | |
title_listing_id = get_title_listing_v1_index(zim) | |
if title_listing_id < 0: | |
return [] | |
article_ids = [] | |
item = zim._get_entry_by_id(title_listing_id).get_item() | |
for index in range(item.size // 4): | |
article_ids.append( | |
int.from_bytes(item.content[index * 4 : index * 4 + 4], byteorder="little") | |
) | |
return article_ids | |
def recreate( | |
src_path: pathlib.Path, | |
dst_path: pathlib.Path, | |
new_meta: List, | |
nb_workers: int, | |
debug: bool, | |
) -> int: | |
print(f"Starting zimrecreate\n\tfrom: {src_path}\n\tinto: {dst_path}") | |
src_path = src_path.expanduser().resolve() | |
dst_path = dst_path.expanduser().resolve() | |
print(f"Checking provided metadata ({len(new_meta)})") | |
# check user-specified metadata first | |
new_metadata = {} | |
for line in new_meta: | |
try: | |
key, data = line.split("=", 1) | |
except ValueError: | |
print(f"ERROR: Malformed metadata param: {line}") | |
return 2 | |
# text metadata | |
if not data.startswith("data:"): | |
new_metadata[key] = ("text/plain;charset=UTF-8", data.encode("UTF-8")) | |
continue | |
# binary metadata | |
try: | |
mimetype, payload = re.match( | |
r"^data:(?P<mimetype>.+);base64,(?P<payload>.+)$", data | |
).groups() | |
except Exception: | |
print(f"ERROR: Malformed binary metadata param: {data}") | |
return 2 | |
try: | |
new_metadata[key] = (mimetype, base64.b64decode(payload)) | |
except Exception: | |
print(f"ERROR: Invalid base64 payload: {payload}") | |
return 2 | |
print("Analyzing source ZIM") | |
try: | |
src = Archive(src_path) | |
except Exception as exc: | |
raise IOError(f"Source ZIM ({src_path}) doesnt exists: {exc}") | |
if not src.has_new_namespace_scheme: | |
print("WARNING: Source ZIM had namespaces.") | |
main_path = ( | |
src.main_entry.get_redirect_entry().path | |
if src.main_entry.is_redirect | |
else src.main_entry.path | |
) | |
front_articles = get_front_articles(src) | |
# retrieve source metadata | |
metadata = {} | |
for name in src.metadata_keys: | |
# illustrations handled separately, Counter added by libzim | |
if name in ("Counter") or illus_re.match(name): | |
continue | |
item = src.get_metadata_item(name) | |
metadata[name] = (item.mimetype, bytes(item.content)) | |
for size in src.get_illustration_sizes(): | |
item = src.get_illustration_item(size) | |
metadata[f"Illustration_{size}x{size}@1"] = (item.mimetype, bytes(item.content)) | |
# override metadata with user-provided ones | |
metadata.update(new_metadata) | |
print("Computed new ZIM metadata to:") | |
for key, value in metadata.items(): | |
preview = ( | |
(value[1].decode("UTF-8") if isinstance(value[1], bytes) else value[1]) | |
if value[0].startswith("text/plain") | |
else f"{value[0]} binary ({len(value[0])} bytes)" | |
) | |
print(f"\t{key}: {preview}") | |
missing_mandatory_metadata = [ | |
name | |
for name in [ | |
"Title", | |
"Description", | |
"Creator", | |
"Publisher", | |
"Date", | |
"Name", | |
"Language", | |
] | |
if name not in metadata | |
] | |
if missing_mandatory_metadata: | |
print( | |
"ERROR: Destination ZIM would lack mandatory metadata: " | |
f"{', '.join(missing_mandatory_metadata)}." | |
) | |
return 2 | |
print("Starting destination ZIM Creator") | |
dst = Creator(filename=dst_path).config_nbworkers(nb_workers) | |
if debug: | |
dst.config_verbose(True) | |
if "Language" in metadata: | |
dst.config_indexing(True, metadata["Language"][1].decode("UTF-8")) | |
dst.set_mainpath(main_path) | |
dst.__enter__() | |
# metadata | |
print("Adding metadata & illustrations") | |
for key, value in metadata.items(): | |
if debug: | |
print(f"> {key}") | |
if illus_re.match(key): | |
size = int(illus_re.match(key).groupdict()["size"].split("x", 1)[0]) | |
dst.add_illustration(size, value[1]) | |
continue | |
dst.add_metadata(key, content=value[1], mimetype=value[0]) | |
print("Adding all entries") | |
is_front = None | |
for index in range(src.all_entry_count): | |
entry = src._get_entry_by_id(index) | |
# hack to get around the fact we don't know the actual namespace | |
if entry.path in metadata.keys(): | |
continue | |
if entry.path in ("Counter",) or illus_re.match(entry.path): | |
continue | |
if entry.path in ( | |
"title/xapian", | |
"fulltext/xapian", | |
"listing/titleOrdered/v0", | |
"listing/titleOrdered/v1", | |
): | |
continue | |
# would be W/mainPage, handled by libzim | |
if entry.path == "mainPage" or entry == src.main_entry: | |
continue | |
if front_articles: | |
is_front = entry.path == main_path or index in front_articles | |
if debug: | |
print(f"> {is_front} -- {entry.path}") | |
if entry.is_redirect: | |
dst.add_redirection( | |
path=entry.path, | |
title=entry.title, | |
targetPath=entry.get_redirect_entry().path, | |
hints={} if is_front is None else {Hint.FRONT_ARTICLE: is_front}, | |
) | |
continue | |
dst.add_item(ProxyItem(entry.get_item(), is_front=is_front)) | |
print("Finishing ZIM…") | |
dst.__exit__(None, None, None) | |
return 0 | |
def entrypoint(): | |
epilog = ( | |
"""ZIM Metadata spec: https://wiki.openzim.org/wiki/Metadata | |
Metadata are not restricted to the ones specified in the spec; """ | |
"""but those are the ones that are used by ZIM readers. | |
Use proper case (Pascal Case) when specifying standard Metadata. | |
You can use data: URLs to specify non-plain/text metadata using base64 encoding with: | |
data:<mimetype>;base64,<data> | |
Illustrations are set as `Illustration_<size>@1` """ | |
"""with size=48x48 for the default illustration. | |
Examples: | |
-m "Title=A new Hope" | |
-m "Illustration_48x48@1=""" | |
"""CQd1PeAAAAGXRFWHRTb2Z0d2FyZQBBZG9iZSBJbWFnZVJlYWR5ccllPAAAAA9JREFUeNpi+P//""" | |
"""P0CAAQAF/gL+Lc6J7gAAAABJRU5ErkJggg==" | |
Limitations: | |
- Compression Hint is readable so is not set. All entries uses default mode | |
- ZIM cannot contain entries with following paths: | |
- mainPage | |
- title/xapian | |
- fulltext/xapian | |
- listing/titleOrdered/v0 | |
- listing/titleOrdered/v1 | |
""" | |
) | |
parser = argparse.ArgumentParser( | |
prog="zimrecreate.py", | |
description="Recreate a ZIM from another ZIM, possibly changing metadata", | |
epilog=epilog, | |
formatter_class=argparse.RawTextHelpFormatter, | |
) | |
parser.add_argument("src_path") | |
parser.add_argument("dst_path") | |
parser.add_argument( | |
"-m", | |
"--meta", | |
dest="new_meta", | |
action="append", | |
default=list(), | |
help='New metadata to set on ZIM. Use -m "Title=Better ZIM Title" format', | |
) | |
parser.add_argument( | |
"--threads", | |
help="Nb of threads to use in libzim", | |
default=4, | |
type=int, | |
dest="nb_workers", | |
) | |
parser.add_argument( | |
"--debug", help="Enable verbose output", action="store_true", default=False | |
) | |
parser.add_argument( | |
"-v", "--version", action="version", version=f"%(prog)s {__version__}" | |
) | |
args = dict(parser.parse_args()._get_kwargs()) | |
args["src_path"] = pathlib.Path(args["src_path"]) | |
args["dst_path"] = pathlib.Path(args["dst_path"]) | |
try: | |
sys.exit(recreate(**args)) | |
except Exception as exc: | |
print(f"ERROR. An {type(exc).__name__} error occurred: {exc}") | |
if args["debug"]: | |
print(traceback.format_exception_only(exc, sys.last_value)) | |
raise SystemExit(1) | |
if __name__ == "__main__": | |
entrypoint() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment