Skip to content

Instantly share code, notes, and snippets.

@yottatsa
Created March 15, 2025 18:48
Show Gist options
  • Save yottatsa/cd2d1390335faa94ffe4987676b79ef4 to your computer and use it in GitHub Desktop.
Save yottatsa/cd2d1390335faa94ffe4987676b79ef4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import logging
import argparse
import os
from typing import Callable, Collection, Iterable, Optional, Set, Tuple, NamedTuple
import glob
from functools import partial, lru_cache
import subprocess
import shlex
from itertools import chain as ichain, compress, tee
import unidecode
import datetime
import operator
from fn import F, _, Stream
from fn.op import call
logger: logging.Logger = logging.getLogger(__name__)
RSYNC = ["rsync", "-aP", "--whole-file", "--update", "--one-file-system", "--inplace"]
memoize = lru_cache(maxsize=None)
def streamdebug(message="Processed %s"):
def debug(item):
logger.debug(message, item)
return item
return debug
# def depth_filter(depth: int) -> Callable:
# return lambda dirpath: dirpath.count("/") == depth
def trim_root(root: str, path: str) -> str:
return lambda dirpath: dirpath.count("/") == depth
depth_filter = lambda depth: F(operator.eq, depth) << operator.methodcaller(
"count", "/"
)
trim_root = (
lambda root: F()
<< operator.methodcaller("lstrip", "/")
<< operator.itemgetter(1)
<< operator.methodcaller("split", root, 1)
)
def pexec(args):
logger.debug("exec: %s", " ".join(map(shlex.quote, args)))
subprocess.check_call(args)
def walk(root: str, include_root: bool = False) -> Iterable[str]:
for croot, directories, files in os.walk(root):
if not include_root:
croot = croot.split(root, 1)[1].lstrip("/")
for directory in directories:
if directory.startswith("."):
continue
yield os.path.join(croot, directory)
for filename in files:
if filename.startswith("."):
continue
yield os.path.join(croot, filename)
class MediaItem(NamedTuple):
title: "TitleItem"
name: str
locations: Set[str]
def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self.title.name}/{self.name}({len(self.locations)})>"
def set_location(self, location):
self.locations.add(location)
return self
def __hash__(self):
return hash(self.title) + hash(self.name)
class TitleItem(NamedTuple):
library: "AnimeLib"
name: str
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self.name}>"
def media(self) -> Collection[MediaItem]:
return call(
F(filter, F(operator.eq, self) << operator.attrgetter("title")),
self.library.media(),
)
@memoize
def get_media(self, name: str) -> MediaItem:
return MediaItem(self, name, set())
class MediaIndex(NamedTuple):
source_path: str
last_updated: datetime.datetime
library: "AnimeLib"
@memoize
def media(self) -> Collection[MediaItem]:
"""
Remote media.
"""
return call(
F(set)
<< F(
map,
F(streamdebug("Processed remote media: %s"))
<< (
lambda i: self.library.get_title(i[1])
.get_media(i[2])
.set_location("/".join(i))
)
<< operator.methodcaller("rsplit", sep="/", maxsplit=2),
)
<< F(filter, F() << depth_filter(1) << trim_root(self.source_path),),
self.contents,
)
@property
def contents():
return Stream() << call(
F()
<< depth_filter(self.source_path.count("/") + 1)
<< F(filter, os.path.isdir),
walk(self.source_path, include_root=True),
)
class DiskIndex:
def __init__(self, library: "AnimeLib") -> None:
self.library = library
@property
def contents(self):
return Stream() << walk(self.library.library_path, include_root=True)
@memoize
def titles(self) -> Set[TitleItem]:
"""
On-disk titles.
"""
# FIXME how to run a few filters at once? tee + compress?
return call(
F(set)
<< F(
map,
F(streamdebug("Processed on-disk title: %s"))
<< self.library.get_title
<< trim_root(self.library.library_path),
)
<< F(
filter,
lambda a: all(
f(a)
for f in [
F() << depth_filter(0) << trim_root(self.library.library_path),
F(os.path.isdir),
]
),
),
self.contents,
)
@memoize
def media(self) -> Collection[MediaItem]:
"""
On-disk media.
"""
# FIXME how to do update on mutable structure with a data copy??
return call(
F(set)
<< F(
map,
F(streamdebug("Processed on-disk media: %s"))
<< (
lambda i: self.library.get_title(i[1])
.get_media(i[2])
.set_location("/".join(i))
)
<< operator.methodcaller("rsplit", sep="/", maxsplit=2),
)
<< F(
filter, F() << depth_filter(1) << trim_root(self.library.library_path),
),
self.contents,
)
class AnimeLib:
def __init__(self) -> None:
self.pending_path = "/srv/media/unsorted"
self.library_path = "/srv/media/anime"
self.backup_path = "/srv/backup/tape"
self.disk_index = DiskIndex(self)
@memoize
def get_title(self, name: str) -> TitleItem:
return TitleItem(self, name)
def titles(self) -> Collection[TitleItem]:
return self.disk_index.titles()
def media(self) -> Collection[MediaItem]:
return self.disk_index.media()
def pending(self) -> Collection[str]:
return filter(depth_filter(0), walk(self.pending_path))
def collection(self) -> Set[str]:
return set(filter(depth_filter(1), walk(self.library_path)))
def backup(self) -> Set[str]:
return set(filter(depth_filter(1), walk(self.backup_path)))
def backup_item(self, item: str, dst: str) -> bool:
src = os.path.join(self.pending_path, item)
logger.info("%s -> %s", src, dst)
pexec(RSYNC + [src, os.path.dirname(dst)])
return True
def backup_library_item(self, item: str, title: str, dst: str) -> bool:
src = os.path.join(self.library_path, title, item)
logger.info("%s -> %s", src, dst)
pexec(RSYNC + [src, os.path.dirname(dst)])
return True
def archive_backup(self, src: str, title: str) -> bool:
item = os.path.basename(src)
title_dir = os.path.join(self.backup_path, title)
dst = os.path.join(self.backup_path, title, item)
logger.info("%s -> %s", src, dst)
if not os.path.exists(title_dir):
os.mkdir(title_dir)
if not os.path.exists(dst) and not os.path.islink(dst):
os.symlink(src, dst)
return True
def archive_item(self, item: str, title: str) -> bool:
src = os.path.join(self.pending_path, item)
dst = os.path.join(self.library_path, title, item)
logger.info("%s -> %s", src, dst)
os.rename(src, dst)
return True
def cleanup(string: str) -> str:
return unidecode.unidecode(string.replace("-", " ").replace("_", " ").replace(".", " ").lower())
PendingItem = Tuple[str, str]
def search_pending(
pending: Collection[str], titles: Collection[str], backup_storage: Optional[str]
) -> Collection[PendingItem]:
title_matchers = list(map(lambda t: (t, cleanup(t)), titles))
for item in pending:
matcher = cleanup(item)
logger.debug("matcher = %s", matcher)
for title, title_matcher in title_matchers:
if title_matcher == "orange":
continue
if title_matcher in matcher:
if backup_storage:
logger.info("Archive %s as %s and backup to %s", item, title, os.path.basename(backup_storage))
else:
logger.info("Archive %s as %s", item, title)
yield (item, title)
def process_pending(
library: AnimeLib, backup_storage: Optional[str], a: PendingItem
) -> Collection[bool]:
item, title = a
if backup_storage:
backup = os.path.join(backup_storage, item)
yield library.backup_item(item, backup)
yield library.archive_backup(backup, title)
yield library.archive_item(item, title)
PendingBackup = Tuple[str, str]
def resolve_pending_backup(pending_backup: Collection[str], backup_storage: Optional[str]) -> Collection[PendingBackup]:
if backup_storage:
for path in pending_backup:
title = os.path.dirname(path)
item = os.path.basename(path)
if not os.path.exists(os.path.join(backup_storage, item)):
logger.info('Backup %s to %s', path, os.path.basename(backup_storage))
yield (item, title)
def process_pending_backup(library: AnimeLib, backup_storage: Optional[str], a: PendingBackup) -> Collection[bool]:
item, title = a
if backup_storage:
backup = os.path.join(backup_storage, item)
yield library.backup_library_item(item, title, backup)
yield library.archive_backup(backup, title)
#def search_lost_backup(
# pending_backup: Collection[str], search_path: Collection[str]
#) -> Collection[PendingBackup]:
# """
# 1. Accepts collections of missed backups and search paths
# 2. Globs search paths
# 3. Match items
# """
# pending_hash = {
# os.path.basename(item): os.path.dirname(item) for item in pending_backup
# }
# candidates = [
# (os.path.join(d, c), pending_hash.get(c))
# for d in search_path
# for c in walk(d)
# if depth_filter(0)(c) and c in pending_hash
# ]
# if candidates:
# logger.info("Candidates: %s", candidates)
# elif pending_backup:
# logger.error("No candidates available.")
#
# return candidates
#
#def process_lost_backup(library: AnimeLib, a: PendingBackup) -> Collection[bool]:
# src, title = a
# yield library.archive_backup(src, title)
def write_index(library: AnimeLib, index_file: str = "index.md") -> None:
fn = os.path.join(library.library_path, index_file)
with open(fn, "w+") as f:
for title in library.titles():
f.write(f"* {title}\n")
def main(debug: bool = False) -> None:
"""
Parse arguments, sort, and print output.
"""
parser = argparse.ArgumentParser(description=("archiving anime to tape"))
parser.add_argument("--debug", action="store_true", default=debug)
parser.add_argument(
"--clowntown", action="store_true", default=False, help="do stuff",
)
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
library = AnimeLib()
logger.debug("Titles: %s", list(library.titles()))
for i in library.titles():
logger.debug("%s: %s", i, list(i.media()))
write_index(library)
storage_path = glob.glob("/mnt/anime_A0000*")
logger.debug("storage_path = %s", storage_path)
active_storages = list(filter(os.path.ismount, storage_path))
logger.debug("active_storages = %s", active_storages)
backup_storage = active_storages and active_storages[0]
pending_archival = list(library.pending())
if pending_archival:
logger.warning("Pending %s", pending_archival)
pending_backup = library.collection() - library.backup()
if pending_backup:
logger.warning("Pending backup %s", pending_backup)
library_titles = map(str, library.titles())
actions = ichain(
*map(
partial(process_pending, library, backup_storage),
search_pending(pending_archival, library_titles, backup_storage),
),
*map(
partial(process_pending_backup, library, backup_storage),
resolve_pending_backup(pending_backup, backup_storage),
),
#*map(
# partial(process_pending_backup, library),
# search_pending_backup(pending_backup, storage_path),
#),
)
if args.clowntown:
result = all(actions)
logger.debug("result = %s", result)
if not result:
raise SystemError
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment