Skip to content

Instantly share code, notes, and snippets.

@juftin
Last active October 17, 2023 18:50
Show Gist options
  • Save juftin/847a424f33af8de13486969fea572bf9 to your computer and use it in GitHub Desktop.
Save juftin/847a424f33af8de13486969fea572bf9 to your computer and use it in GitHub Desktop.
Epic EHR Integration with an HTTPX Client - authlib==1.2.1
"""
Epic OAuth2 Client
"""
from __future__ import annotations
import datetime
import logging
import os
import pathlib
import urllib.parse
import uuid
from typing import Any, ClassVar
from authlib.integrations.httpx_client import AsyncOAuth2Client, OAuth2Client
from authlib.jose import jwt
from authlib.oauth2.client import OAuth2Client as _OAuth2Client
from httpx import USE_CLIENT_DEFAULT
from httpx._types import AuthTypes
class EpicBaseOAuth2Client(_OAuth2Client):
"""
Epic OAuth2 Client Base Class
"""
sandbox_url: ClassVar[str] = "https://fhir.epic.com/interconnect-fhir-oauth"
def __init__(
self,
client_id: str | None = None,
epic_endpoint: str | None = None,
cert: str | None = None,
cert_file: str | pathlib.Path | None = None,
) -> None:
"""
Epic OAuth2 Client
Parameters
----------
client_id: str | None
Epic Client ID. Defaults to `EPIC_CLIENT_ID` environment variable if not
provided.
epic_endpoint: str | None
Epic Endpoint. Defaults to sandbox
cert: str | None
Certificate string, by default None.
cert_file: str | Path | None
Certificate file path, by default None. If both `cert` and `cert_file` are
provided, `cert` will be used.
"""
self.epic_endpoint = epic_endpoint or self.sandbox_url
if self.epic_endpoint.endswith("/"):
self.epic_endpoint = str(epic_endpoint)[:-1]
self._private_key: str | None = cert or os.getenv("EPIC_PRIVATE_KEY")
if cert_file is not None and self._private_key is None:
cert_path = pathlib.Path(cert_file)
if cert_path.exists():
self._private_key = cert_path.read_text(encoding="utf-8")
else:
msg = f"Certificate file {cert_file} not found"
raise FileNotFoundError(msg)
elif self._private_key is None and cert_file is None:
msg = "Either `cert` or `cert_file` must be provided"
raise ValueError(msg)
epic_client_id = client_id or os.getenv("EPIC_CLIENT_ID")
if epic_client_id is None:
msg = "Either `client_id` or `EPIC_CLIENT_ID` environment variable must be provided"
raise ValueError(msg)
super().__init__(
client_id=epic_client_id,
client_secret=None,
token_endpoint_auth_method="client_credentials",
token_endpoint=f"{self.epic_endpoint}/oauth2/token",
grant_type="client_credentials",
)
self.register_client_auth_method(("client_credentials", lambda _a, _b, c, d, e: (c, d, e)))
def _prepare_token_endpoint_body(self, *args, **kwargs) -> str:
"""
Custom JWT token handling for Epic
"""
_ = args, kwargs
current_time = datetime.datetime.now(tz=datetime.timezone.utc)
expiration_time = current_time + datetime.timedelta(minutes=4.5)
jti = uuid.uuid4().hex
jwt_data = {
"iss": self.client_id,
"sub": self.client_id,
"aud": self.metadata["token_endpoint"],
"jti": jti,
"iat": current_time,
"exp": expiration_time,
}
header = {"alg": "RS256", "typ": "JWT"}
jwt_token = jwt.encode(
header=header,
payload=jwt_data,
key=self._private_key,
)
data_dict = {
"grant_type": "client_credentials",
"client_assertion_type": ("urn:ietf:params:oauth:client-assertion-type:jwt-bearer"),
"client_assertion": jwt_token.decode(encoding="utf-8"),
}
form_data = urllib.parse.urlencode(data_dict)
return form_data
def should_fetch_token(
self,
url: str,
withhold_token: bool = False,
auth: AuthTypes | None = USE_CLIENT_DEFAULT, # type: ignore[assignment]
) -> bool:
"""
Determine if we should fetch a token.
Authlib automatically _refreshes_ tokens, but it does not fetch the
initial one. Therefore, we should fetch a token the first time a
request is sent; i.e. when self.token is None.
https://github.com/lepture/authlib/issues/531
"""
return (
not withhold_token
and auth is USE_CLIENT_DEFAULT
and self.token is None
and url != self.metadata["token_endpoint"]
)
class EpicOAuth2Client(EpicBaseOAuth2Client, OAuth2Client):
"""
Epic OAuth2 Client: HTTPX
"""
def request(
self,
method: str,
url: str,
withhold_token: bool = False,
auth: AuthTypes = USE_CLIENT_DEFAULT, # type: ignore[assignment]
**kwargs: Any,
) -> Any:
"""
Alter the parent `request` method to automatically fetch a token the first time
"""
if self.should_fetch_token(url=url, withhold_token=withhold_token, auth=auth):
self.fetch_token(grant_type="client_credentials")
return super().request(
method=method, url=url, withhold_token=withhold_token, auth=auth, **kwargs
)
class EpicAsyncOAuth2Client(EpicBaseOAuth2Client, AsyncOAuth2Client):
"""
Async Epic OAuth2 Client: HTTPX
"""
async def request(
self,
method: str,
url: str,
withhold_token: bool = False,
auth: AuthTypes = USE_CLIENT_DEFAULT, # type: ignore[assignment]
**kwargs: Any,
) -> Any:
"""
Alter the parent `request` method to automatically fetch a token the first time
"""
if self.should_fetch_token(url=url, withhold_token=withhold_token, auth=auth):
await self.fetch_token(grant_type="client_credentials")
return await super().request(
method=method, url=url, withhold_token=withhold_token, auth=auth, **kwargs
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
non_production_client_id = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
client = EpicOAuth2Client(client_id=non_production_client_id)
# https://fhir.epic.com/Documentation?docId=testpatients
patient_id = "e0w0LEDCYtfckT6N.CkJKCw3"
patient_resp = client.get(
url=f"{EpicOAuth2Client.sandbox_url}/API/FHIR/R4/Patient/{patient_id}",
headers={"Accept": "application/fhir+json"},
)
patient_resp.raise_for_status()
patient_dict: dict[str, Any] = patient_resp.json()
reports_resp = client.get(
url=f"{EpicOAuth2Client.sandbox_url}/API/FHIR/R4/DiagnosticReport",
headers={"Accept": "application/fhir+json"},
params={"patient": patient_dict["id"]},
)
reports_resp.raise_for_status()
reports: dict[str, Any] = reports_resp.json()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment