Created
August 21, 2020 19:14
-
-
Save fiendish/d3410c3e868952039d21cbaae425b6eb to your computer and use it in GitHub Desktop.
Converts a NAACCR XML file to CSV
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import csv | |
import argparse | |
from collections import defaultdict | |
from lxml import etree | |
if __name__ == "__main__": | |
argparser = argparse.ArgumentParser(description="Convert a NAACCR XML file to CSV") | |
argparser.add_argument("xml_file", help="The XML file path") | |
args = argparser.parse_args() | |
xparser = etree.XMLParser(remove_blank_text=True) | |
tree = etree.parse(args.xml_file, xparser) | |
root = tree.getroot() | |
if etree.QName(root).namespace != "http://naaccr.org/naaccrxml": | |
print("Not a NAACCR file. Missing http://naaccr.org/naaccrxml namespace.") | |
sys.exit(1) | |
# remove namespace prefixes | |
for elem in root.getiterator(): | |
elem.tag = etree.QName(elem).localname | |
etree.cleanup_namespaces(root) | |
x_patients = [node for node in root if node.tag == "Patient"] | |
all_patients = [] | |
all_tumors = [] | |
def gather_node_items(parent): | |
ret = defaultdict(list) | |
for node in parent: | |
if node.tag == "Item": | |
ret[node.get("naaccrId")] = node.text | |
else: | |
ret[node.tag].append(gather_node_items(node)) | |
return ret | |
# list patients | |
all_patients = [gather_node_items(p) for p in x_patients] | |
# list tumors | |
for p in all_patients: | |
for t in p.pop("Tumor", None): | |
t["patientIdNumber"] = p["patientIdNumber"] | |
all_tumors.append(t) | |
def write_csv(filename, things): | |
print("Writing file", filename) | |
fieldnames = set() | |
for t in things: | |
fieldnames |= set(t.keys()) | |
with open(filename, 'w') as f: | |
csvwriter = csv.DictWriter(f, delimiter=',', fieldnames=fieldnames) | |
csvwriter.writeheader() | |
for record in things: | |
csvwriter.writerow(record) | |
if all_patients: | |
write_csv("patients.csv", all_patients) | |
if all_tumors: | |
write_csv("tumors.csv", all_tumors) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment