Last active
October 30, 2024 02:28
-
-
Save johngrimes/1aacedb70d8b509553a0d0ef02a12677 to your computer and use it in GitHub Desktop.
Streaming SQL on FHIR example with Pathling and Kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import click | |
import os | |
from pathling._version import ( | |
__java_version__, | |
__scala_version__, | |
__delta_version__, | |
__hadoop_version__, | |
) | |
from pyspark import __version__ as __spark_version__ | |
from pyspark.sql import SparkSession, DataFrame | |
from pathling import PathlingContext | |
from pyspark.sql.functions import explode, from_json | |
# These constants define the base directory structure for the application. | |
# The BASE_DIR is calculated relative to this file's location, while the | |
# SPARK_CHECKPOINT_DIR is used to persist a checkpoint so that the stream | |
# can be resumed in the event of a failure. | |
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) | |
TARGET_DIR = os.path.join(BASE_DIR, "target") | |
SPARK_CHECKPOINT_DIR = os.path.join(TARGET_DIR, "checkpoints") | |
def _get_or_create_spark(): | |
"""Creates or retrieves a Spark session configured for FHIR data processing. | |
This function sets up the Spark environment with necessary dependencies including | |
Kafka connectivity, Pathling, and PostgreSQL support.""" | |
os.environ["SPARK_CONF_DIR"] = os.path.join(BASE_DIR, "conf", "spark-conf") | |
spark_builder = SparkSession.builder.config( | |
"spark.jars.packages", | |
f"org.apache.spark:spark-sql-kafka-0-10_{__scala_version__}:{__spark_version__}," | |
f"au.csiro.pathling:library-runtime:{__java_version__}," | |
f"org.postgresql:postgresql:42.2.18", | |
).config("spark.sql.streaming.checkpointLocation", SPARK_CHECKPOINT_DIR) | |
return spark_builder.getOrCreate() | |
def view_patient(data): | |
"""Creates a view of Patient resources with essential demographic information. | |
This view extracts the patient identifier and gender from the FHIR Patient resource. | |
""" | |
return data.view( | |
"Patient", | |
select=[ | |
{ | |
"column": [ | |
{ | |
"description": "Patient ID", | |
"path": "getResourceKey()", | |
"name": "id", | |
"type": "string", | |
"collection": "false", | |
}, | |
{ | |
"description": "Gender", | |
"path": "gender", | |
"name": "gender", | |
"type": "code", | |
"collection": "false", | |
}, | |
] | |
}, | |
], | |
) | |
def view_diagnosis(data): | |
"""Creates a view of Condition resources that contain SNOMED CT diagnosis codes. | |
This view captures condition identifiers, patient references, SNOMED CT codes, | |
and specifically flags viral infections using a query to the terminology server.""" | |
return data.view( | |
"Condition", | |
select=[ | |
{ | |
"column": [ | |
{ | |
"description": "Condition ID", | |
"path": "getResourceKey()", | |
"name": "id", | |
"type": "string", | |
"collection": "false", | |
}, | |
{ | |
"description": "Patient ID", | |
"path": "subject.getReferenceKey()", | |
"name": "patient_id", | |
"type": "string", | |
"collection": "false", | |
}, | |
{ | |
"description": "SNOMED CT diagnosis code", | |
"path": "code.coding.where(system = 'http://snomed.info/sct').code", | |
"name": "sct_id", | |
"type": "code", | |
"collection": "false", | |
}, | |
{ | |
"description": "Viral infection", | |
"path": "code.memberOf('http://snomed.info/sct?fhir_vs=ecl/<34014006')", | |
"name": "viral_infection", | |
"type": "boolean", | |
"collection": "false", | |
}, | |
], | |
} | |
], | |
) | |
def view_encounter(data): | |
"""Creates a comprehensive view of Encounter resources with detailed visit information. | |
This view includes encounter timing, service provider details, and multiple coded values | |
for visit type, arrival mode, departure status, and other clinical attributes.""" | |
return data.view( | |
"Encounter", | |
select=[ | |
{ | |
"column": [ | |
{ | |
"description": "Encounter ID", | |
"path": "getResourceKey()", | |
"name": "id", | |
"type": "string", | |
"collection": "false", | |
}, | |
{ | |
"description": "Patient ID", | |
"path": "subject.getReferenceKey()", | |
"name": "patient_id", | |
"type": "string", | |
"collection": "false", | |
}, | |
{ | |
"description": "Encounter start date", | |
"path": "period.start", | |
"name": "start_time", | |
"type": "dateTime", | |
"collection": "false", | |
}, | |
{ | |
"description": "Encounter end date", | |
"path": "period.end", | |
"name": "end_time", | |
"type": "dateTime", | |
"collection": "false", | |
}, | |
{ | |
"description": "Encounter service provider", | |
"path": "serviceProvider.getReferenceKey()", | |
"name": "service_provider", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
"select": [ | |
{ | |
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/typeofvisit')", | |
"column": [ | |
{ | |
"description": "Type of visit code", | |
"path": "code", | |
"name": "type_of_visit_code", | |
"type": "code", | |
"collection": "false", | |
}, | |
{ | |
"description": "Type of visit description", | |
"path": "display", | |
"name": "type_of_visit_desc", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
}, | |
{ | |
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/modeofarrival')", | |
"column": [ | |
{ | |
"description": "Mode of arrival code", | |
"path": "code", | |
"name": "mode_of_arrival_code", | |
"type": "code", | |
"collection": "false", | |
}, | |
{ | |
"description": "Mode of arrival description", | |
"path": "display", | |
"name": "mode_of_arrival_desc", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
}, | |
{ | |
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/departurestatus')", | |
"column": [ | |
{ | |
"description": "Departure status code", | |
"path": "code", | |
"name": "departure_status_code", | |
"type": "code", | |
"collection": "false", | |
}, | |
{ | |
"description": "Departure status description", | |
"path": "display", | |
"name": "departure_status_desc", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
}, | |
{ | |
"forEachOrNull": "priority.coding.where(system = 'http://occio.qh/data/ats')", | |
"column": [ | |
{ | |
"description": "ATS code", | |
"path": "code", | |
"name": "ats_code", | |
"type": "code", | |
"collection": "false", | |
}, | |
], | |
}, | |
{ | |
"forEachOrNull": "reasonCode.coding.where(system = 'http://occio.qh/data/presentingproblem')", | |
"column": [ | |
{ | |
"description": "Presenting problem code", | |
"path": "code", | |
"name": "presenting_problem_code", | |
"type": "code", | |
"collection": "false", | |
}, | |
{ | |
"description": "Presenting problem description", | |
"path": "display", | |
"name": "presenting_problem_desc", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
}, | |
{ | |
"forEachOrNull": "diagnosis.where(use.coding.exists(system = 'Admission diagnosis'))", | |
"column": [ | |
{ | |
"description": "Admission diagnosis ID", | |
"path": "condition.getReferenceKey()", | |
"name": "admission_diagnosis_id", | |
"type": "string", | |
"collection": "false", | |
}, | |
], | |
}, | |
], | |
} | |
], | |
) | |
@click.command() | |
@click.option("--kafka-topic", help="Kafka topic to subscribe to") | |
@click.option("--kafka-bootstrap-servers", help="Kafka bootstrap servers") | |
@click.option("--db-name", help="Database name to write to") | |
@click.option("--schema", help="Database schema to use") | |
@click.option("--host", help="Database host") | |
@click.option("--user", help="Database user") | |
@click.option("--password", help="Database password") | |
def start_consumer( | |
kafka_topic, kafka_bootstrap_servers, db_name, schema, host, user, password | |
): | |
"""Initiates the main consumer process for processing FHIR resources from Kafka. | |
This function orchestrates the entire ETL pipeline, from reading Kafka messages | |
to transforming FHIR resources and persisting them in PostgreSQL.""" | |
def _subscribe_to_kafka_topic(): | |
"""Establishes a streaming connection to the specified Kafka topic. | |
This function configures the Spark streaming reader to consume messages | |
from the earliest offset available.""" | |
return ( | |
spark.readStream.format("kafka") | |
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) | |
.option("subscribe", kafka_topic) | |
.option("startingOffsets", "earliest") | |
.load() | |
) | |
def _to_resource_stream(kafka_stream, resource_type): | |
"""Transforms the raw Kafka stream into a typed FHIR resource stream. | |
This function extracts FHIR resources from the bundle structure and filters | |
for specific resource types before encoding them with Pathling.""" | |
json_stream = ( | |
kafka_stream.selectExpr("CAST(value AS STRING) AS bundle") | |
.select( | |
explode( | |
from_json( | |
"bundle", "STRUCT<entry:ARRAY<STRUCT<resource:STRING>>>" | |
).entry.resource | |
).alias("resource") | |
) | |
.filter( | |
from_json("resource", "STRUCT<resourceType:STRING>").resourceType | |
== resource_type | |
) | |
) | |
return pc.encode(json_stream, resource_type) | |
def write_postgresql(df: DataFrame, db_name, schema, view_name): | |
"""Handles the persistence of processed data frames to PostgreSQL. | |
This function implements an upsert strategy, updating existing records | |
while inserting new ones, maintaining data consistency through primary keys.""" | |
import psycopg2 | |
columns = df.columns | |
insert_columns = ", ".join(columns) | |
insert_values = ", ".join(["%s"] * len(columns)) | |
# Exclude 'id' from the update set to avoid updating the primary key. | |
update_set = ", ".join( | |
[f"{col} = EXCLUDED.{col}" for col in columns if col != "id"] | |
) | |
sql = f""" | |
INSERT INTO {schema}.{view_name} ({insert_columns}) | |
VALUES ({insert_values}) | |
ON CONFLICT (id) DO UPDATE SET {update_set} | |
""" | |
def upsert_partition(partition): | |
# Establish connection per partition. | |
conn = psycopg2.connect( | |
host=host, | |
database=db_name, | |
user=user, | |
password=password, | |
) | |
cursor = conn.cursor() | |
data = list(partition) | |
if data: | |
cursor.executemany(sql, data) | |
conn.commit() | |
cursor.close() | |
conn.close() | |
# Apply the upsert function to each partition. | |
df.foreachPartition(upsert_partition) | |
click.echo( | |
f"Starting kafka listener on topic: {kafka_topic} at: {kafka_bootstrap_servers}" | |
) | |
click.echo(f"Writing to database: {db_name}") | |
spark = _get_or_create_spark() | |
pc = PathlingContext.create( | |
spark, terminology_server_url="http://velonto-ontoserver-service/fhir" | |
) | |
spark.sparkContext.setLogLevel("ERROR") | |
update_stream = _subscribe_to_kafka_topic() | |
data = pc.read.datasets( | |
{ | |
resource_type: _to_resource_stream(update_stream, resource_type) | |
for resource_type in ["Patient", "Encounter", "Condition"] | |
} | |
) | |
all_views = [view_patient, view_encounter, view_diagnosis] | |
console_sinks = [] | |
postgresql_sinks = [] | |
for view_f in all_views: | |
view_name = view_f.__name__ | |
view_data = view_f(data) | |
# Console sink. | |
console_sink = ( | |
view_data.writeStream.outputMode("append") | |
.format("console") | |
.start(f"console_{view_name}") | |
) | |
console_sinks.append(console_sink) | |
# PostgreSQL sink. | |
postgresql_sink = ( | |
# This will start a streaming query in the background. | |
view_data.writeStream.foreachBatch( | |
lambda df, epoch_id, view_name=view_name: write_postgresql( | |
df, db_name, schema, view_name | |
) | |
) | |
.outputMode("append") | |
.start() | |
) | |
postgresql_sinks.append(postgresql_sink) | |
for sink in console_sinks + postgresql_sinks: | |
# This will block until each of the streaming queries has terminated. | |
sink.awaitTermination() | |
if __name__ == "__main__": | |
start_consumer() |
This file has been truncated, but you can view the full file.
View raw
(Sorry about that, but we can’t show files that are this big right now.)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment