Skip to content

Instantly share code, notes, and snippets.

@romunov
Created February 24, 2025 07:05
Show Gist options
  • Save romunov/f25c1310b31fc3517227e70817a9f841 to your computer and use it in GitHub Desktop.
Save romunov/f25c1310b31fc3517227e70817a9f841 to your computer and use it in GitHub Desktop.
import argparse
import shelve
import os
import logging
from pathlib import Path
import tarfile
import gzip
import shutil
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
handlers=[logging.FileHandler("history.log"), logging.StreamHandler()],
)
parser = argparse.ArgumentParser(
prog="extractArchives",
description="""
Extract vcf and fusion tables from archive bundles. The files will be
extracted to subfolders within the current directory.
""",
epilog="(C) Good enough solutions, 2025",
)
parser.add_argument(
"-b", "--base", default=None, help="Absolute or relative path to folder that holds .tar.zx bundles."
)
parser.add_argument("-d", "--dryrun", action=argparse.BooleanOptionalAction, help="Only simulate extracting.")
args = parser.parse_args()
HISTORY_DB = "history_db"
if args.base:
BASE_FOLDER = Path(args.base)
else:
BASE_FOLDER = Path(".")
logging.info("Searching for files")
tar_bundles = list(BASE_FOLDER.rglob("*tar.xz"))
# Filter out junk files.
tar_bundles = [x for x in tar_bundles if "Plan_" in os.fspath(x)]
tar_bundles = [x for x in tar_bundles if "Intermediate_Results_Files" not in os.fspath(x)]
logging.info(f"Found {len(tar_bundles)} archive files")
OK_MEMBERS = [".vcf", "Fusion.tsv"]
for tar in tar_bundles:
logging.info(f"Processing: {tar}")
with shelve.open(HISTORY_DB) as db:
try:
key = db[tar.name]
if key == "OK":
logging.info(f"Skipping {tar} tar as it already exists")
continue
except KeyError:
pass
try:
with tarfile.open(tar, mode="r:xz") as tar_archive:
members = tar_archive.getnames()
members_to_extract = []
for member in members:
members_to_extract.extend([member for m in OK_MEMBERS if m in member])
if args.dryrun:
logging.info(f"Extracting files: {members_to_extract}")
else:
tar_archive.extractall(members=members_to_extract, filter="data")
for member in members_to_extract:
with open(member, mode="rb") as f_in, gzip.open(f"{member}.gz", mode="wb") as f_out:
shutil.copyfileobj(f_in, f_out)
Path(member).unlink()
except FileNotFoundError as fnf:
logging.error(fnf)
with shelve.open(HISTORY_DB) as db:
db[tar.name] = "ERROR"
continue
with shelve.open(HISTORY_DB) as db:
db[tar.name] = "OK"
logging.info("Script done OK")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment