|
import argparse |
|
from pathling import PathlingContext, property_of, PropertyType |
|
from pyspark.sql.functions import array_contains |
|
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType |
|
|
|
# Set up argument parser. |
|
parser = argparse.ArgumentParser(description='Process coding data.') |
|
parser.add_argument('--input', type=str, help='Input NDJSON file', required=True) |
|
parser.add_argument('--output', type=str, help='Output CSV file', required=True) |
|
|
|
# Parse arguments. |
|
args = parser.parse_args() |
|
|
|
# Create a Pathling context for processing FHIR data. |
|
pc = PathlingContext.create() |
|
|
|
# Define the schema for a 'Coding' object in FHIR. |
|
coding_schema = StructType([ |
|
StructField("id", StringType(), True), |
|
StructField("system", StringType(), True), |
|
StructField("version", StringType(), True), |
|
StructField("code", StringType(), True), |
|
StructField("display", StringType(), True), |
|
StructField("userSelected", BooleanType(), True) |
|
]) |
|
|
|
# Define the schema for the main structure which includes 'Coding' as a nested structure. |
|
schema = StructType([ |
|
StructField("file", StringType(), True), |
|
StructField("line", IntegerType(), True), |
|
StructField("coding", coding_schema, True) |
|
]) |
|
|
|
# Read the JSONL data into a DataFrame with the specified schema. |
|
codings = pc.spark.read.json(args.input, schema=schema) |
|
|
|
# Add a new column 'inactive' which checks if the 'coding' array contains 'inactive' boolean property. |
|
with_inactive = codings.withColumn("inactive", array_contains( |
|
property_of(codings.coding, "inactive", PropertyType.BOOLEAN), True)) |
|
|
|
# Select specific fields from the DataFrame and rename some for clarity. |
|
result = with_inactive.select( |
|
with_inactive.file, |
|
with_inactive.line, |
|
with_inactive.coding.getField("system").alias("system"), |
|
with_inactive.coding.getField("code").alias("code")) |
|
|
|
# Filter for rows where 'inactive' is True and write the result to a CSV file. |
|
result.filter(with_inactive.inactive).repartition(1).write.csv(args.output, header=True) |