Last active
July 9, 2024 04:28
-
-
Save johngrimes/64c737f0cb0e8623ed91ac292729de06 to your computer and use it in GitHub Desktop.
A proof-of-concept for the flexible schema approach described in the draft "Parquet on FHIR" specification
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"resourceType": "Condition", | |
"id": "1", | |
"clinicalStatus": { | |
"coding": [ | |
{ | |
"system": "http://terminology.hl7.org/CodeSystem/condition-clinical", | |
"code": "active" | |
} | |
] | |
}, | |
"subject": { | |
"reference": "Patient/13866099-28d7-1249-3e44-ecb490d40fef" | |
}, | |
"onsetDateTime": "1998-11-08T01:26:08+10:00" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"resourceType": "Condition", | |
"id": "2", | |
"verificationStatus": { | |
"coding": [ | |
{ | |
"system": "http://terminology.hl7.org/CodeSystem/condition-ver-status", | |
"code": "confirmed" | |
} | |
] | |
}, | |
"subject": { | |
"identifier": { | |
"system": "http://example.org", | |
"value": "123456" | |
}, | |
"display": "Mr. Test Patient" | |
}, | |
"recordedDate": "1998-11-08T01:26:08+10:00" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from delta.tables import DeltaTable | |
from pyspark.sql import SparkSession | |
spark = ( | |
SparkSession.builder.config( | |
"spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0" | |
) | |
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") | |
.config( | |
"spark.sql.catalog.spark_catalog", | |
"org.apache.spark.sql.delta.catalog.DeltaCatalog", | |
) | |
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") | |
.getOrCreate() | |
) | |
df1 = DeltaTable.forPath(spark, "data/delta/1.Condition.parquet") | |
df1.toDF().printSchema() | |
df2 = DeltaTable.forPath(spark, "data/delta/2.Condition.parquet") | |
df2.toDF().printSchema() | |
df1.alias("old").merge( | |
df2.alias("new").toDF(), "new.id = old.id" | |
).whenMatchedUpdateAll().execute() | |
df3 = DeltaTable.forPath(spark, "data/delta/1.Condition.parquet") | |
df3.toDF().printSchema() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
root | |
|-- clinicalStatus: struct (nullable = true) | |
| |-- coding: array (nullable = true) | |
| | |-- element: struct (containsNull = true) | |
| | | |-- code: string (nullable = true) | |
| | | |-- system: string (nullable = true) | |
|-- id: string (nullable = true) | |
|-- onsetDateTime: string (nullable = true) | |
|-- resourceType: string (nullable = true) | |
|-- subject: struct (nullable = true) | |
| |-- reference: string (nullable = true) | |
root | |
|-- id: string (nullable = true) | |
|-- recordedDate: string (nullable = true) | |
|-- resourceType: string (nullable = true) | |
|-- subject: struct (nullable = true) | |
| |-- display: string (nullable = true) | |
| |-- identifier: struct (nullable = true) | |
| | |-- system: string (nullable = true) | |
| | |-- value: string (nullable = true) | |
|-- verificationStatus: struct (nullable = true) | |
| |-- coding: array (nullable = true) | |
| | |-- element: struct (containsNull = true) | |
| | | |-- code: string (nullable = true) | |
| | | |-- system: string (nullable = true) | |
24/07/09 11:45:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. | |
root | |
|-- clinicalStatus: struct (nullable = true) | |
| |-- coding: array (nullable = true) | |
| | |-- element: struct (containsNull = true) | |
| | | |-- code: string (nullable = true) | |
| | | |-- system: string (nullable = true) | |
|-- id: string (nullable = true) | |
|-- onsetDateTime: string (nullable = true) | |
|-- resourceType: string (nullable = true) | |
|-- subject: struct (nullable = true) | |
| |-- reference: string (nullable = true) | |
| |-- display: string (nullable = true) | |
| |-- identifier: struct (nullable = true) | |
| | |-- system: string (nullable = true) | |
| | |-- value: string (nullable = true) | |
|-- recordedDate: string (nullable = true) | |
|-- verificationStatus: struct (nullable = true) | |
| |-- coding: array (nullable = true) | |
| | |-- element: struct (containsNull = true) | |
| | | |-- code: string (nullable = true) | |
| | | |-- system: string (nullable = true) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import lit | |
spark = SparkSession.builder.getOrCreate() | |
df1 = spark.read.parquet("data/parquet/1.Condition.parquet") | |
schema = df1.schema | |
df1.printSchema() | |
def safe_column(df, col_name): | |
if col_name in schema.names: | |
return df[col_name] | |
else: | |
return lit(None).alias(col_name) | |
result = df1.select( | |
safe_column(df1, "id"), safe_column(df1, "foo"), safe_column(df1, "onsetDateTime") | |
) | |
result.show(truncate=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+---+----+-------------------------+ | |
|id |foo |onsetDateTime | | |
+---+----+-------------------------+ | |
|1 |NULL|1998-11-08T01:26:08+10:00| | |
+---+----+-------------------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
spark = ( | |
SparkSession.builder.config( | |
"spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0" | |
) | |
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") | |
.config( | |
"spark.sql.catalog.spark_catalog", | |
"org.apache.spark.sql.delta.catalog.DeltaCatalog", | |
) | |
.getOrCreate() | |
) | |
df1 = spark.read.json("data/json/1.Condition.json", multiLine=True) | |
df1.printSchema() | |
df1.write.format("delta").save("data/delta/1.Condition.parquet", mode="overwrite") | |
df2 = spark.read.json("data/json/2.Condition.json", multiLine=True) | |
df2.printSchema() | |
df2.write.format("delta").save("data/delta/2.Condition.parquet", mode="overwrite") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment