Created
May 24, 2023 16:17
-
-
Save EtsuNDmA/55ec5f998d79510469d2fbe7e1c0d524 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import os | |
import time | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor | |
from datetime import timedelta | |
from functools import partial | |
from pathlib import Path | |
import pydicom | |
import warnings | |
warnings.filterwarnings("ignore", category=UserWarning) | |
logger = logging.getLogger("anonymizer") | |
def parse_args() -> argparse.Namespace: | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument( | |
"input_dicom_dir", | |
type=Path, | |
help="path to the directory with dicom files", | |
) | |
parser.add_argument( | |
"output_dicom_dir", | |
type=Path, | |
help="path to the directory fro anonymized files", | |
) | |
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true") | |
parser.add_argument("-c", "--concurrency", help="number of processes to run in parallel", type=int, default=None) | |
return parser.parse_args() | |
def configure_logging(is_verbose: bool = False): | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter("%(asctime)-15s %(levelname)-8s %(message)s")) | |
logger.addHandler(handler) | |
logger.setLevel(level=logging.DEBUG if is_verbose else logging.INFO) | |
def scan_input_dir_or_path(path_to_dicoms: Path) -> list[Path]: | |
dicom_paths: list[Path] = [] | |
if path_to_dicoms.is_dir(): | |
logger.info("Scanning dicoms in %s", path_to_dicoms) | |
for dir_path, _, filenames in os.walk(path_to_dicoms): | |
for filename in filenames: | |
if filename.endswith(".dcm"): | |
dicom_paths.append(Path(dir_path) / filename) | |
else: | |
if path_to_dicoms.suffix == ".dcm": | |
dicom_paths.append(path_to_dicoms) | |
logger.info("Found %s files", len(dicom_paths)) | |
return dicom_paths | |
def _anonymize(dataset): | |
dataset.PatientID = "ANON" | |
dataset.PatientName = "ANON" | |
dataset.PatientBirthDate = "19000101" | |
if "OtherPatientIDs" in dataset: | |
del dataset.OtherPatientIDs | |
if "OtherPatientIDsSequence" in dataset: | |
del dataset.OtherPatientIDsSequence | |
def anonymize(filename: Path, input_dicom_dir: Path, output_dicom_dir: Path): | |
dataset = pydicom.dcmread(filename) | |
_anonymize(dataset) | |
output_filename = output_dicom_dir / filename.relative_to(input_dicom_dir) | |
Path(output_filename.parent).mkdir(parents=True, exist_ok=True) | |
dataset.save_as(output_filename) | |
def main(args: argparse.Namespace) -> None: | |
tic = time.monotonic() | |
input_dicom_dir = args.input_dicom_dir | |
output_dicom_dir = args.output_dicom_dir | |
dicom_paths = scan_input_dir_or_path(input_dicom_dir) | |
with ProcessPoolExecutor(args.concurrency) as executor: | |
executor.map( | |
partial(anonymize, input_dicom_dir=input_dicom_dir, output_dicom_dir=output_dicom_dir), dicom_paths | |
) | |
toc = time.monotonic() | |
logger.info("Anonymized dicoms saved to %s", output_dicom_dir) | |
logger.debug("Anonymization of %s files finished at %s", len(dicom_paths), timedelta(seconds=toc - tic)) | |
if __name__ == "__main__": | |
args = parse_args() | |
configure_logging(args.verbose) | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment