Last active
October 17, 2023 18:50
-
-
Save juftin/847a424f33af8de13486969fea572bf9 to your computer and use it in GitHub Desktop.
Epic EHR Integration with an HTTPX Client - authlib==1.2.1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Epic OAuth2 Client | |
""" | |
from __future__ import annotations | |
import datetime | |
import logging | |
import os | |
import pathlib | |
import urllib.parse | |
import uuid | |
from typing import Any, ClassVar | |
from authlib.integrations.httpx_client import AsyncOAuth2Client, OAuth2Client | |
from authlib.jose import jwt | |
from authlib.oauth2.client import OAuth2Client as _OAuth2Client | |
from httpx import USE_CLIENT_DEFAULT | |
from httpx._types import AuthTypes | |
class EpicBaseOAuth2Client(_OAuth2Client): | |
""" | |
Epic OAuth2 Client Base Class | |
""" | |
sandbox_url: ClassVar[str] = "https://fhir.epic.com/interconnect-fhir-oauth" | |
def __init__( | |
self, | |
client_id: str | None = None, | |
epic_endpoint: str | None = None, | |
cert: str | None = None, | |
cert_file: str | pathlib.Path | None = None, | |
) -> None: | |
""" | |
Epic OAuth2 Client | |
Parameters | |
---------- | |
client_id: str | None | |
Epic Client ID. Defaults to `EPIC_CLIENT_ID` environment variable if not | |
provided. | |
epic_endpoint: str | None | |
Epic Endpoint. Defaults to sandbox | |
cert: str | None | |
Certificate string, by default None. | |
cert_file: str | Path | None | |
Certificate file path, by default None. If both `cert` and `cert_file` are | |
provided, `cert` will be used. | |
""" | |
self.epic_endpoint = epic_endpoint or self.sandbox_url | |
if self.epic_endpoint.endswith("/"): | |
self.epic_endpoint = str(epic_endpoint)[:-1] | |
self._private_key: str | None = cert or os.getenv("EPIC_PRIVATE_KEY") | |
if cert_file is not None and self._private_key is None: | |
cert_path = pathlib.Path(cert_file) | |
if cert_path.exists(): | |
self._private_key = cert_path.read_text(encoding="utf-8") | |
else: | |
msg = f"Certificate file {cert_file} not found" | |
raise FileNotFoundError(msg) | |
elif self._private_key is None and cert_file is None: | |
msg = "Either `cert` or `cert_file` must be provided" | |
raise ValueError(msg) | |
epic_client_id = client_id or os.getenv("EPIC_CLIENT_ID") | |
if epic_client_id is None: | |
msg = "Either `client_id` or `EPIC_CLIENT_ID` environment variable must be provided" | |
raise ValueError(msg) | |
super().__init__( | |
client_id=epic_client_id, | |
client_secret=None, | |
token_endpoint_auth_method="client_credentials", | |
token_endpoint=f"{self.epic_endpoint}/oauth2/token", | |
grant_type="client_credentials", | |
) | |
self.register_client_auth_method(("client_credentials", lambda _a, _b, c, d, e: (c, d, e))) | |
def _prepare_token_endpoint_body(self, *args, **kwargs) -> str: | |
""" | |
Custom JWT token handling for Epic | |
""" | |
_ = args, kwargs | |
current_time = datetime.datetime.now(tz=datetime.timezone.utc) | |
expiration_time = current_time + datetime.timedelta(minutes=4.5) | |
jti = uuid.uuid4().hex | |
jwt_data = { | |
"iss": self.client_id, | |
"sub": self.client_id, | |
"aud": self.metadata["token_endpoint"], | |
"jti": jti, | |
"iat": current_time, | |
"exp": expiration_time, | |
} | |
header = {"alg": "RS256", "typ": "JWT"} | |
jwt_token = jwt.encode( | |
header=header, | |
payload=jwt_data, | |
key=self._private_key, | |
) | |
data_dict = { | |
"grant_type": "client_credentials", | |
"client_assertion_type": ("urn:ietf:params:oauth:client-assertion-type:jwt-bearer"), | |
"client_assertion": jwt_token.decode(encoding="utf-8"), | |
} | |
form_data = urllib.parse.urlencode(data_dict) | |
return form_data | |
def should_fetch_token( | |
self, | |
url: str, | |
withhold_token: bool = False, | |
auth: AuthTypes | None = USE_CLIENT_DEFAULT, # type: ignore[assignment] | |
) -> bool: | |
""" | |
Determine if we should fetch a token. | |
Authlib automatically _refreshes_ tokens, but it does not fetch the | |
initial one. Therefore, we should fetch a token the first time a | |
request is sent; i.e. when self.token is None. | |
https://github.com/lepture/authlib/issues/531 | |
""" | |
return ( | |
not withhold_token | |
and auth is USE_CLIENT_DEFAULT | |
and self.token is None | |
and url != self.metadata["token_endpoint"] | |
) | |
class EpicOAuth2Client(EpicBaseOAuth2Client, OAuth2Client): | |
""" | |
Epic OAuth2 Client: HTTPX | |
""" | |
def request( | |
self, | |
method: str, | |
url: str, | |
withhold_token: bool = False, | |
auth: AuthTypes = USE_CLIENT_DEFAULT, # type: ignore[assignment] | |
**kwargs: Any, | |
) -> Any: | |
""" | |
Alter the parent `request` method to automatically fetch a token the first time | |
""" | |
if self.should_fetch_token(url=url, withhold_token=withhold_token, auth=auth): | |
self.fetch_token(grant_type="client_credentials") | |
return super().request( | |
method=method, url=url, withhold_token=withhold_token, auth=auth, **kwargs | |
) | |
class EpicAsyncOAuth2Client(EpicBaseOAuth2Client, AsyncOAuth2Client): | |
""" | |
Async Epic OAuth2 Client: HTTPX | |
""" | |
async def request( | |
self, | |
method: str, | |
url: str, | |
withhold_token: bool = False, | |
auth: AuthTypes = USE_CLIENT_DEFAULT, # type: ignore[assignment] | |
**kwargs: Any, | |
) -> Any: | |
""" | |
Alter the parent `request` method to automatically fetch a token the first time | |
""" | |
if self.should_fetch_token(url=url, withhold_token=withhold_token, auth=auth): | |
await self.fetch_token(grant_type="client_credentials") | |
return await super().request( | |
method=method, url=url, withhold_token=withhold_token, auth=auth, **kwargs | |
) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
non_production_client_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
client = EpicOAuth2Client(client_id=non_production_client_id) | |
# https://fhir.epic.com/Documentation?docId=testpatients | |
patient_id = "e0w0LEDCYtfckT6N.CkJKCw3" | |
patient_resp = client.get( | |
url=f"{EpicOAuth2Client.sandbox_url}/API/FHIR/R4/Patient/{patient_id}", | |
headers={"Accept": "application/fhir+json"}, | |
) | |
patient_resp.raise_for_status() | |
patient_dict: dict[str, Any] = patient_resp.json() | |
reports_resp = client.get( | |
url=f"{EpicOAuth2Client.sandbox_url}/API/FHIR/R4/DiagnosticReport", | |
headers={"Accept": "application/fhir+json"}, | |
params={"patient": patient_dict["id"]}, | |
) | |
reports_resp.raise_for_status() | |
reports: dict[str, Any] = reports_resp.json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment