Created
February 24, 2025 07:05
-
-
Save romunov/f25c1310b31fc3517227e70817a9f841 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import shelve | |
import os | |
import logging | |
from pathlib import Path | |
import tarfile | |
import gzip | |
import shutil | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
datefmt="%d-%b-%y %H:%M:%S", | |
handlers=[logging.FileHandler("history.log"), logging.StreamHandler()], | |
) | |
parser = argparse.ArgumentParser( | |
prog="extractArchives", | |
description=""" | |
Extract vcf and fusion tables from archive bundles. The files will be | |
extracted to subfolders within the current directory. | |
""", | |
epilog="(C) Good enough solutions, 2025", | |
) | |
parser.add_argument( | |
"-b", "--base", default=None, help="Absolute or relative path to folder that holds .tar.zx bundles." | |
) | |
parser.add_argument("-d", "--dryrun", action=argparse.BooleanOptionalAction, help="Only simulate extracting.") | |
args = parser.parse_args() | |
HISTORY_DB = "history_db" | |
if args.base: | |
BASE_FOLDER = Path(args.base) | |
else: | |
BASE_FOLDER = Path(".") | |
logging.info("Searching for files") | |
tar_bundles = list(BASE_FOLDER.rglob("*tar.xz")) | |
# Filter out junk files. | |
tar_bundles = [x for x in tar_bundles if "Plan_" in os.fspath(x)] | |
tar_bundles = [x for x in tar_bundles if "Intermediate_Results_Files" not in os.fspath(x)] | |
logging.info(f"Found {len(tar_bundles)} archive files") | |
OK_MEMBERS = [".vcf", "Fusion.tsv"] | |
for tar in tar_bundles: | |
logging.info(f"Processing: {tar}") | |
with shelve.open(HISTORY_DB) as db: | |
try: | |
key = db[tar.name] | |
if key == "OK": | |
logging.info(f"Skipping {tar} tar as it already exists") | |
continue | |
except KeyError: | |
pass | |
try: | |
with tarfile.open(tar, mode="r:xz") as tar_archive: | |
members = tar_archive.getnames() | |
members_to_extract = [] | |
for member in members: | |
members_to_extract.extend([member for m in OK_MEMBERS if m in member]) | |
if args.dryrun: | |
logging.info(f"Extracting files: {members_to_extract}") | |
else: | |
tar_archive.extractall(members=members_to_extract, filter="data") | |
for member in members_to_extract: | |
with open(member, mode="rb") as f_in, gzip.open(f"{member}.gz", mode="wb") as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
Path(member).unlink() | |
except FileNotFoundError as fnf: | |
logging.error(fnf) | |
with shelve.open(HISTORY_DB) as db: | |
db[tar.name] = "ERROR" | |
continue | |
with shelve.open(HISTORY_DB) as db: | |
db[tar.name] = "OK" | |
logging.info("Script done OK") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment