Created
April 10, 2025 04:32
-
-
Save lordlinus/2845f3a90251b0ebb04d6ade0eb004a1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from enum import Enum | |
import streamlit as st | |
import os | |
import json | |
import asyncio | |
import concurrent.futures | |
from typing import Optional, List, Dict, Any | |
from datetime import datetime, date | |
from dotenv import load_dotenv, find_dotenv | |
from langchain_core.messages import AIMessage, HumanMessage | |
from pydantic import BaseModel | |
from streamlit import session_state, progress | |
import base64 | |
import time | |
from openai import AzureOpenAI, AsyncAzureOpenAI | |
# Import functions from the existing codebase | |
from aoai_assistant_setup import ClaimResponse | |
from functions import ( | |
get_eligible_policies, | |
get_currencies, | |
analyze_document, | |
get_claim_schema, | |
get_payout_methods, | |
run_content_analysis, | |
raise_jira_ticket, | |
) | |
from models.common import Currency, CurrencyResponse | |
from models.eligible_policies import EligiblePoliciesResponse | |
from models.claim import ( | |
Claim, | |
ClaimTypeEnum, | |
ClaimReceipt, | |
ClaimDocument, | |
ClaimDetails, | |
ClaimPayout, | |
) | |
from models.exceptions import ReceiptProcessingError | |
from models.policy import PayoutMethodModeEnum, BankAccount | |
from models.receipt import ( | |
AnalyzeDocumentResponse, | |
ReceiptExtractionResult, | |
BillItem, | |
OcrAnalysisResponse, | |
) | |
from models.payout_methods import PayoutMethodsResponse | |
# Import the OpenAI assistant API functions | |
from aoai_assistant_run import run_conversation, create_thread | |
# Load environment variables | |
load_dotenv(find_dotenv(), override=True) | |
# Create event loop for async operations | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
reasoning_client = AzureOpenAI( | |
azure_endpoint=os.getenv("AZURE_REASONING_MODEL_ENDPOINT"), | |
api_key=os.getenv("AZURE_REASONING_MODEL_API_KEY"), | |
api_version="2024-12-01-preview", | |
) | |
reasoning_client_async = AsyncAzureOpenAI( | |
azure_endpoint=os.getenv("AZURE_REASONING_MODEL_ENDPOINT"), | |
api_key=os.getenv("AZURE_REASONING_MODEL_API_KEY"), | |
api_version="2024-12-01-preview", | |
) | |
# Load assets | |
path_to_user_image = "./assets/images/user.svg" | |
path_to_assistant_image = "./assets/images/pru-logo-icon.svg" | |
path_to_bg_image = "./assets/images/bg-image.png" | |
path_to_font_file = "./assets/fonts/OpenSans-VariableFont_wdth.ttf" | |
path_to_animated_loader = "./assets/images/animated-loader.svg" | |
def encode_file(file): | |
with open(file, "rb") as f: | |
return base64.b64encode(f.read()).decode() | |
bg_image_base64 = encode_file(path_to_bg_image) | |
font_econded = encode_file(path_to_font_file) | |
with open(path_to_animated_loader, "r") as file: | |
svg_content = file.read() | |
debug = False | |
# Get the assistant ID from environment variables | |
ASSISTANT_ID = os.environ.get("ASSISTANT_ID") | |
if not ASSISTANT_ID: | |
raise ValueError( | |
"ASSISTANT_ID not found in environment variables. Run aoai_assistant_setup.py first." | |
) | |
class ChatStateEnum(str, Enum): | |
LOGIN = "LOGIN" | |
DECIDER = "DECIDER" | |
CLAIM_RECEIPT_UPLOAD = "CLAIM_RECEIPT_UPLOAD" | |
CLAIM_AMOUNT_DISPLAY = "CLAIM_AMOUNT_DISPLAY" | |
CLAIM_AMOUNT_CHANGE_DISPLAY = "CLAIM_AMOUNT_CHANGE_DISPLAY" | |
CLAIM_SUMMARY_DISPLAY = "CLAIM_SUMMARY_DISPLAY" | |
CLAIM_SUMMARY_CHANGE_DISPLAY = "CLAIM_SUMMARY_CHANGE_DISPLAY" | |
CLAIM_DETAILS_DISPLAY = "CLAIM_DETAILS_DISPLAY" | |
CLAIM_DETAILS_CHANGE_DISPLAY = "CLAIM_DETAILS_CHANGE_DISPLAY" | |
CLAIM_PAYOUT_DISPLAY = "CLAIM_PAYOUT_DISPLAY" | |
CLAIM_SUBMIT_DISPLAY = "CLAIM_SUBMIT_DISPLAY" | |
class MessageTable(BaseModel): | |
title: str | |
table: str | |
def init_state(): | |
# Initialize session state variables if they don't exist | |
st.session_state.reasoning_output = None | |
if "client_profile" not in st.session_state: | |
st.session_state.client_profile = None | |
if "currency_response" not in st.session_state: | |
st.session_state.currency_response = None | |
if "claim" not in st.session_state: | |
st.session_state.claim = None | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "receipts_processed" not in st.session_state: | |
st.session_state.receipts_processed = False | |
if "assistant_response" not in st.session_state: | |
st.session_state.assistant_response = None | |
if "structured_data" not in st.session_state: | |
st.session_state.structured_data = None | |
if "receipt_results" not in st.session_state: | |
st.session_state.receipt_results = [] | |
if "claim_receipts" not in st.session_state: | |
st.session_state.claim_receipts = [] | |
if "claim_summary" not in st.session_state: | |
st.session_state.claim_summary = None | |
if "chat_state" not in st.session_state: | |
st.session_state.chat_state = ChatStateEnum.LOGIN | |
# Initialize chat messages in session state | |
if "messages" not in st.session_state: | |
# Set an initial message from the "Ai" to prompt the user | |
st.session_state["messages"] = [] | |
def refresh_state(): | |
client_id = st.session_state.claim.clientId | |
st.session_state.client_profile = None | |
st.session_state.currency_response = None | |
st.session_state.claim = None | |
st.session_state.thread_id = None | |
st.session_state.receipts_processed = False | |
st.session_state.assistant_response = None | |
st.session_state.structured_data = None | |
st.session_state.receipt_results = [] | |
st.session_state.claim_receipts = [] | |
st.session_state.claim_summary = None | |
st.session_state.chat_state = ChatStateEnum.DECIDER | |
# Initialize the claim with the client ID | |
st.session_state.claim = initialize_claim(client_id) | |
# Get client profile data | |
st.session_state.client_profile = get_eligible_policies(client_id) | |
# Get currencies | |
st.session_state.currency_response = get_currencies() | |
def display_message(content: str, author: str = "System"): | |
"""Display a message in the Streamlit app""" | |
if author == "System": | |
st.info(content) | |
elif author == "Assistant": | |
st.success(content) | |
else: | |
st.write(content) | |
def format_policies_info(policies_rs: EligiblePoliciesResponse) -> str: | |
"""Format eligible policies information into a markdown table""" | |
policies_info = "### Eligible Policies\n\n" | |
policies = policies_rs.policies | |
if len(policies) > 0: | |
policies_info += ( | |
"| Policy ID | Policy Name | Status | Lives Assured | Claim Types |\n" | |
) | |
policies_info += ( | |
"|-----------|-------------|--------|--------------|-------------|\n" | |
) | |
for policy in policies: | |
policy_id = policy.policy.id | |
policy_name = policy.policy.name | |
# Get policy status | |
status = "Active" if policy.policy.status.is_active else "Inactive" | |
# Get lives assured names | |
lives_assured_names = [] | |
for life in policy.policy.lives_assured: | |
lives_assured_names.append(life.name) | |
lives_assured_str = ( | |
", ".join(lives_assured_names) if lives_assured_names else "N/A" | |
) | |
claim_types = ", ".join(policy.claim_types) if policy.claim_types else "N/A" | |
policies_info += f"| {policy_id} | {policy_name} | {status} | {lives_assured_str} | {claim_types} |\n" | |
else: | |
policies_info += "No eligible policies found for this client.\n" | |
return policies_info | |
def format_currencies_info(currency_response: CurrencyResponse) -> str: | |
"""Format available currencies information into a markdown table""" | |
if not currency_response or not currency_response.currencies: | |
return "No currencies available." | |
# Create a markdown table | |
md_table = "### Available Currencies\n\n" | |
md_table += "| Code | Name | Symbol |\n" | |
md_table += "|------|------|--------|\n" | |
# Add each currency to the table | |
for currency in currency_response.currencies: | |
md_table += f"| {currency.code} | {currency.name} | {currency.symbol} |\n" | |
return md_table | |
def initialize_claim(client_id: str) -> Claim: | |
"""Initialize a new claim object with default values""" | |
# Create default empty claim | |
return Claim( | |
clientId=client_id, | |
lifeAssured="", # Will be set when user selects life assured | |
claimType=ClaimTypeEnum.HOSPITALISATION, # Default, will be set based on user selection | |
policyId="", # Will be set when user selects policy | |
details=ClaimDetails( | |
hospitalName="", claimingFromOtherInsurers=False, finalAmount=0.0 | |
), | |
receipts=[], | |
documents=[], | |
payout=ClaimPayout( | |
mode=PayoutMethodModeEnum.DIRECT_CREDIT, # Default payout method | |
currency=Currency( | |
code="SGD", name="Singapore Dollar", symbol="$" | |
), # Default currency | |
account=BankAccount( | |
name="Default Bank", | |
holder=client_id, # Use client ID as default account holder | |
account_no="", # Will be filled in later | |
branch_code=None, | |
), | |
), | |
) | |
def extract_receipt(file_path: str, file_name: str) -> ReceiptExtractionResult: | |
""" | |
Extract information from a receipt file using Azure Document Intelligence. | |
Args: | |
file_path: The path to the uploaded file | |
file_name: The name of the uploaded file | |
Returns: | |
ReceiptExtractionResult: The extracted receipt information | |
""" | |
try: | |
extracted_data = run_content_analysis(file_path) | |
response = OcrAnalysisResponse.model_validate(extracted_data) | |
content = response.results[0] | |
receipt_number = content.receipt_number | |
receipt_date_str = content.receipt_date | |
admission_date_str = content.admission_date | |
discharge_date_str = content.discharge_date | |
hospital = content.hospital | |
currency_code = content.currency | |
bill_amount_str = content.amount | |
md_content = content.md_content | |
invoice_addressee = content.extracted_data["invoiceAddressee"]["valueString"] | |
# Convert bill amount to float | |
bill_amount = 0.0 | |
try: | |
# Remove currency symbol and commas | |
cleaned_amount = bill_amount_str.replace("$", "").replace(",", "").strip() | |
bill_amount = float(cleaned_amount) | |
except ValueError: | |
bill_amount = 0.0 | |
# Create markdown content for display | |
md_content += "\n### Receipt Details\n\n" | |
md_content += "| Field | Value |\n" | |
md_content += "|-------|-------|\n" | |
# Create simplified data for display | |
simplified_data = { | |
"Receipt Number": receipt_number, | |
"Receipt Date": receipt_date_str, | |
"Admission Date": admission_date_str, | |
"Discharge Date": discharge_date_str, | |
"Hospital": hospital, | |
"Currency": currency_code, | |
"Bill Amount": bill_amount_str, | |
"GST": "", | |
} | |
for key, value in simplified_data.items(): | |
field_key = key.replace( | |
" ", "" | |
) # Convert to format matching the API response | |
# confidence = getattr(fields, field_key.lower(), None) | |
# confidence_str = f"{confidence.confidence:.2f}" if confidence and hasattr(confidence, | |
# 'confidence') else "N/A" | |
md_content += f"| {field_key} | {value} |\n" | |
bill_items = [] | |
if "valueArray" in content.extracted_data["billDetails"]: | |
bill_items = [ | |
BillItem( | |
service=item["valueObject"]["description"]["valueString"], | |
detail="", | |
amount=str(item["valueObject"]["amt"]["valueNumber"]), | |
) | |
for item in content.extracted_data["billDetails"]["valueArray"] | |
if "valueObject" in item | |
and "description" in item["valueObject"] | |
and "amt" in item["valueObject"] | |
and "valueNumber" in item["valueObject"]["amt"] | |
] | |
md_content += "\n### Bill Items\n\n" | |
md_content += "| Service | Amount |\n" | |
md_content += "|--------|--------|\n" | |
for item in bill_items: | |
md_content += f"| {item.service} | {item.amount} |\n" | |
# Return the extracted data and receipt information | |
return ReceiptExtractionResult( | |
success=True, | |
file_name=file_name, | |
file_path=file_path, | |
receipt_number=receipt_number, | |
receipt_date=receipt_date_str, | |
admission_date=admission_date_str, | |
discharge_date=discharge_date_str, | |
hospital=hospital, | |
currency=currency_code, | |
amount=str(bill_amount), | |
md_content=md_content, | |
extracted_data=extracted_data, | |
bill_items=bill_items, | |
invoice_addressee=invoice_addressee, | |
) | |
except Exception as e: | |
return ReceiptExtractionResult(success=False, file_name=file_name, error=str(e)) | |
async def reasoning_model_call(md_content: str) -> str: | |
""" | |
Make an async call to the reasoning model. | |
Args: | |
result: The receipt extraction result | |
Returns: | |
The reasoning output from the model | |
""" | |
reasoning_output = await reasoning_client_async.chat.completions.create( | |
model="o3-mini", | |
messages=[ | |
{ | |
"role": "developer", | |
"content": "You are an experienced health insurance claim processing analyst with deep expertise in medical billing, claims adjudication, and anomaly detection. Your role is to deliver comprehensive and evidence-based analyses of health insurance claims, emphasizing clarity, precision, and domain-specific insights.", | |
}, | |
{ | |
"role": "user", | |
"content": f"Based on the claim details provided, please produce a detailed analysis that includes the following points:\n\n1. **Type of Medical Service Provided:** Clearly identify the category of service (e.g., inpatient, outpatient, emergency care, elective procedure) along with any distinguishing details about the treatment.\n\n2. **General Observation About the Medical Claim:** Summarize the claim with an overview of the service rendered and its alignment with standard practices. Highlight any patterns or key context that might normally be expected.\n\n3. **Anomalies Detected:** Point out any irregularities, discrepancies, or red flags in the claim. Explain why these anomalies are significant, discussing potential issues such as coding errors, inconsistencies in documentation, or deviations from typical claim patterns.\n\n4. **Additional Notes/Comments:** Provide any further insights or recommendations for further review. This may include suggestions for additional verification, potential areas for deeper investigation, or contextual observations that support a more informed decision-making process.\n\nYour analysis should be structured, detailed, and reflect your deep expertise in health insurance claim processing. Use bullet points or numbered lists where relevant and support your conclusions with clear, evidence-based reasoning to ensure that the claim provider gains a comprehensive understanding of the claim's validity and any potential risks.\n Claim content {md_content}.\n\n", | |
}, | |
], | |
max_completion_tokens=5000, | |
) | |
with open("reasoning_output.json", "w") as f: | |
json.dump(reasoning_output.to_dict(), f, indent=4) | |
# st.session_state.reasoning_output = reasoning_output.choices[0].message.content | |
# return reasoning_output.choices[0].message.content | |
# Create a function that will run in a separate thread | |
def call_reasoning_model_in_background(md_content): | |
""" | |
Run the reasoning model in a background thread without blocking the main application flow. | |
Args: | |
md_content: The markdown content to analyze | |
""" | |
# Create a new event loop for this thread | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
# Run the async function in this thread's event loop | |
reasoning_output = loop.run_until_complete(reasoning_model_call(md_content)) | |
print("Reasoning model call completed in background") | |
except Exception as e: | |
print(f"Error in background reasoning model call: {e}") | |
finally: | |
loop.close() | |
def process_receipts(uploaded_files, progress_bar): | |
""" | |
Process multiple receipt files in parallel. | |
Args: | |
uploaded_files: List of uploaded files from Streamlit | |
progress_bar: progress bar | |
Returns: | |
tuple[str, List[ClaimReceipt], List[ReceiptExtractionResult]]: A tuple containing the summary markdown, list of claim receipts and list of receipt extraction results | |
""" | |
# Show processing message | |
# st.info(f"Hold tight while I review the documents...") | |
# progress_bar = st.progress(0) | |
# Process files | |
results = [] | |
reasoning_futures = [] | |
for i, file in enumerate(uploaded_files): | |
# Save the file to a temporary location | |
file_path = f"/tmp/{file.name}" | |
with open(file_path, "wb") as f: | |
f.write(file.getbuffer()) | |
# Extract receipt information | |
result = extract_receipt(file_path, file.name) | |
print(f"********************************") | |
# print(result.md_content) | |
# Queue up the async reasoning call without waiting for it | |
# reasoning_future = asyncio.run(reasoning_model_call(result.md_content)) | |
# print("schedule the run of the async task") | |
# reasoning_futures.append(reasoning_future) | |
# print("store the state") | |
# Call the reasoning model | |
results.append(result) | |
print(f"********************************") | |
# Update progress | |
progress_bar.progress((i + 1) / 2 * len(uploaded_files)) | |
# Process the results | |
successful_receipts = 0 | |
failed_receipts = 0 | |
claim_receipts = [] | |
for result in results: | |
if result.success and result.currency is not None: | |
# Create a ClaimDocument for the receipt | |
document_id = f"receipt_{successful_receipts + 1}" | |
receipt_document = ClaimDocument(type="RECEIPT", id=document_id) | |
if result.currency is None: | |
currency_obj = Currency( | |
code="UNKNOWN", name=f"Unknown Currency", symbol="UNKNOWN" | |
) | |
else: | |
currency_obj = Currency( | |
code=result.currency, name=f"{result.currency} Currency", symbol="" | |
) | |
# Create a ClaimReceipt object | |
receipt = ClaimReceipt( | |
number=result.receipt_number or document_id, | |
receiptDate=( | |
parse_date(result.receipt_date) | |
if result.receipt_date | |
else date.today() | |
), | |
admissionDate=( | |
parse_date(result.admission_date) if result.admission_date else None | |
), | |
dischargeDate=( | |
parse_date(result.discharge_date) if result.discharge_date else None | |
), | |
hospitalName=result.hospital or "Unknown Hospital", | |
currency=currency_obj, | |
amount=result.amount or 0.0, | |
documents=[receipt_document], | |
patientName=result.invoice_addressee, | |
) | |
claim_receipts.append(receipt) | |
# Display receipt information | |
with st.expander(f"Receipt {successful_receipts + 1}"): | |
st.markdown(result.md_content, unsafe_allow_html=True) | |
if debug: | |
st.session_state.messages.append( | |
MessageTable( | |
title=f"Receipt {successful_receipts + 1}", | |
table=result.md_content, | |
) | |
) | |
successful_receipts += 1 | |
else: | |
error_message = ( | |
result.error if result.error else "Failed to extract receipt data" | |
) | |
if result.currency is None: | |
error_message = "Failed to extract currency information from receipt" | |
# Display error | |
with st.expander(f"Error processing receipt {failed_receipts + 1}"): | |
st.error( | |
f"⚠️ Error processing receipt '{result.file_name}': {error_message}" | |
) | |
raise ReceiptProcessingError("Error processing receipt") | |
failed_receipts += 1 | |
# Update claim details with final amount | |
if st.session_state.claim: | |
st.session_state.claim.details.finalAmount = sum( | |
receipt.amount for receipt in claim_receipts | |
) | |
# Show summary | |
summary = f"#### Receipt Processing Summary\n\n" | |
summary += f"Successfully processed: {successful_receipts} receipt(s)\n" | |
if failed_receipts > 0: | |
summary += f"- Failed to process: {failed_receipts} receipt(s)\n" | |
if st.session_state.claim: | |
summary += f"\nFrom the information we are able to obtain from the receipts, we can see that you are looking to claim the following amount: {st.session_state.claim.details.finalAmount} {st.session_state.claim.payout.currency.code}" | |
# Return the summary markdown and the list of claim receipts | |
return summary, claim_receipts, results | |
def format_message(structured_data: ClaimResponse) -> str: | |
""" | |
Format the structured data from the assistant into a readable message. | |
Args: | |
structured_data: The ClaimResponse object from the assistant | |
Returns: | |
str: A formatted message for display | |
""" | |
if not structured_data: | |
return "Error: Could not parse structured data from assistant response" | |
# Extract data from the ClaimResponse | |
message = structured_data.message | |
# Build the formatted message with tables | |
formatted_message = st.markdown( | |
f""" | |
<div class="info-card"> | |
<div class="card-title">Claim Analysis</div> | |
<div class="analysis-section"> | |
<div class="key"> | |
<strong>Analysis</strong> | |
</div> | |
<div class="value">{message}</div> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
return formatted_message | |
def parse_date(date_str: str) -> Optional[date]: | |
"""Parse a date string into a date object""" | |
if not date_str: | |
return None | |
# Try different date formats | |
date_formats = [ | |
"%d/%m/%Y", # 31/12/2023 | |
"%d-%m-%Y", # 31-12-2023 | |
"%Y-%m-%d", # 2023-12-31 | |
"%Y/%m/%d", # 2023/12/31 | |
"%d %b %Y", # 31 Dec 2023 | |
"%d %B %Y", # 31 December 2023 | |
"%b %d, %Y", # Dec 31, 2023 | |
"%B %d, %Y", # December 31, 2023 | |
] | |
# Clean the date string | |
date_str = date_str.strip() | |
# Try each format | |
for fmt in date_formats: | |
try: | |
return datetime.strptime(date_str, fmt).date() | |
except ValueError: | |
continue | |
# If all formats fail, return None | |
return None | |
async def run_assistant(user_message): | |
"""Run the conversation with the assistant""" | |
thread_id, assistant_response, structured_data = await run_conversation( | |
assistant_id=ASSISTANT_ID, user_input=user_message | |
) | |
st.session_state.thread_id = thread_id | |
st.session_state.assistant_response = assistant_response | |
st.session_state.structured_data = structured_data | |
return structured_data | |
# This can be an llm call, just doing it to save time | |
def contains_claim(text: str) -> bool: | |
""" | |
Check if the given text contains the word 'claim' or 'claims'. | |
:param text: The input text to check | |
:return: True if the text contains 'claim' or 'claims', otherwise False | |
""" | |
pattern = r"\bclaims?\b" | |
return bool(re.search(pattern, text, re.IGNORECASE)) | |
############################################################################################################################################################# | |
def main(): | |
st.set_page_config( | |
page_title="PruClaim AI Assistant", page_icon="🏥", layout="wide" | |
) | |
# Load CSS for styling | |
# load_css(css_file_path) | |
# Load dynamic styles | |
st.markdown( | |
f""" | |
<style> | |
@font-face {{ | |
font-family: 'OpenSansPru'; | |
src: url('data:font/woff2;base64,{font_econded}') format('woff2'); | |
font-weight: 100 300 500 700 800; /* Range of weights */ | |
font-stretch: 75% 125%; /* Range of widths */ | |
}} | |
body, body > *, h1, h2, h3, h4, h5, p, a, div {{ | |
font-family: 'OpenSansPru'!important; | |
font-weight: 500; | |
}} | |
.stApp {{ | |
background-image: url("data:image/png;base64,{bg_image_base64}"); | |
background-size: cover; | |
background-position: center; | |
}} | |
.info_button {{ | |
text-align: right; | |
display: none; | |
}} | |
.info-card {{ | |
background-color: white; | |
padding: 16px; | |
border-radius: 16px; | |
width: -webkit-fill-available; | |
margin-bottom: 16px; | |
}} | |
.card-title {{ | |
font-size: 24px; | |
font-weight: 700; | |
margin-bottom: 24px; | |
border-bottom: 1px solid #dcdcdc; | |
padding-bottom: 16px; | |
}} | |
.stSidebar {{ | |
background-color: white; | |
}} | |
.flex-grow {{ | |
flex-grow: 1; | |
}} | |
.flex-row {{ | |
display: flex; | |
flex-direction: row; | |
gap: 16px; | |
align-items: center; | |
justify-content: space-between; | |
}} | |
.card-key-value {{ | |
margin-bottom: 16px; | |
}} | |
.card-key-value .key {{ | |
font-weight: 700; | |
}} | |
.card-key-value .value {{ | |
font-weight: 500; | |
}} | |
[data-testid="stFileUploaderDropzone"] {{ | |
width: -webkit-fill-available; | |
background-color: white; | |
border-radius: 16px; | |
}} | |
[data-testid="stFileUploaderDropzone"] button {{ | |
margin-left: 32px; | |
}} | |
[data-testid="stFileUploaderDropzoneInstructions"] span svg path {{ | |
color: black; | |
}} | |
[data-testid="stWidgetLabel"] [data-testid="stMarkdownContainer"] p {{ | |
font-size: 16px; | |
font-weight: 700; | |
}} | |
[data-testid="stFileUploaderDropzone"] + div {{ | |
margin-top: 8px; | |
border-radius: 16px; | |
background: white; | |
padding-bottom: 8px; | |
}} | |
[data-testid="stFileUploaderDropzone"] + div path {{ | |
color: black; | |
}} | |
.stChatMessage ~ .stElementContainer, .stChatMessage ~ .stExpander, .stChatMessage ~ .stForm {{ | |
margin-left: 58px!important; | |
}} | |
.key-metric {{ | |
font-size: 32px; | |
font-weight: 500; | |
margin-bottom: 16px; | |
}} | |
.stProgress {{ | |
margin-top: 16px; | |
}} | |
.stProgress p {{ | |
font-size: 16px; | |
font-weight: 700; | |
margin-bottom: 8px; | |
}} | |
.stForm {{ | |
margin-top: 16px; | |
background-color: white; | |
border: 1px solid #dcdcdc; | |
}} | |
.stChatMessage {{ | |
padding-bottom: 0px!important; | |
}} | |
</style> | |
""", | |
unsafe_allow_html=True, | |
) | |
# st.title("PruClaim AI Assistant") | |
if debug: | |
# Sidebar for authentication | |
with st.sidebar: | |
# st.header("Authentication") | |
# client_id = st.text_input("Client ID", value="C111") | |
# if st.button("Login"): | |
# # Initialize the claim with the client ID | |
# st.session_state.claim = initialize_claim(client_id) | |
# | |
# # Get client profile data | |
# st.session_state.client_profile = get_eligible_policies(client_id) | |
# | |
# # Get currencies | |
# st.session_state.currency_response = get_currencies() | |
# | |
# st.success(f"Logged in as {client_id}") | |
# st.session_state.messages = [] | |
# st.session_state.messages.append(AIMessage("Hello, how can I help you today?")) # graph asks user for a new response | |
# st.session_state.chat_state = ChatStateEnum.CLAIM_RECEIPT_UPLOAD | |
st.write(st.session_state.chat_state) | |
st.write(st.session_state.structured_data) | |
# Main content area | |
if True: | |
# Loop through all messages in the session state and render them as a chat on every st.refresh mech | |
for msg in st.session_state.messages: | |
# https://docs.streamlit.io/develop/api-reference/chat/st.chat_message | |
# we store them as AIMessage and HumanMessage as its easier to send to LangGraph | |
if isinstance(msg, AIMessage): | |
st.chat_message("assistant", avatar=path_to_assistant_image).write( | |
msg.content | |
) | |
elif isinstance(msg, HumanMessage): | |
st.chat_message("user", avatar=path_to_user_image).write(msg.content) | |
# Ugliness here because for some reason isinstance stops working on this class | |
elif "MessageTable" in str(type(msg)): | |
table = msg.table | |
st.markdown(table, unsafe_allow_html=True) | |
if session_state.chat_state == ChatStateEnum.LOGIN: | |
client = "C111" | |
# Initialize the claim with the client ID | |
st.session_state.claim = initialize_claim(client) | |
# Get client profile data | |
st.session_state.client_profile = get_eligible_policies(client) | |
# Get currencies | |
st.session_state.currency_response = get_currencies() | |
st.success(f"Logged in as {client}") | |
st.session_state.messages = [] | |
st.session_state.chat_state = ChatStateEnum.CLAIM_RECEIPT_UPLOAD | |
st.rerun() | |
if session_state.chat_state == ChatStateEnum.DECIDER: | |
decider() | |
if session_state.chat_state == ChatStateEnum.CLAIM_RECEIPT_UPLOAD: | |
claim_receipt_upload() | |
if session_state.chat_state == ChatStateEnum.CLAIM_AMOUNT_DISPLAY: | |
claim_amount_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_AMOUNT_CHANGE_DISPLAY: | |
claim_amount_change_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_SUMMARY_DISPLAY: | |
claim_summary_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_SUMMARY_CHANGE_DISPLAY: | |
claim_summary_change_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_DETAILS_DISPLAY: | |
claim_details_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_DETAILS_CHANGE_DISPLAY: | |
claim_details_change_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_PAYOUT_DISPLAY: | |
claim_payout_display() | |
if session_state.chat_state == ChatStateEnum.CLAIM_SUBMIT_DISPLAY: | |
claim_submit_display() | |
def claim_submit_display(): | |
if st.session_state.structured_data: | |
format_message(st.session_state.structured_data) | |
st.write("Would you like to submit the claim?") | |
if st.button("Yes"): | |
raise_jira_ticket(st.session_state.structured_data) | |
# Enable auto-scroll | |
st_autoscroll = """ | |
<script> | |
const iframe = document.querySelector('iframe'); | |
if (iframe) { | |
iframe.scrollIntoView({ behavior: 'smooth', block: 'end' }); | |
} | |
</script> | |
""" | |
st.session_state.messages.append( | |
st.components.v1.html( | |
f""" | |
<iframe srcdoc='{svg_content}' style="width:100%; max-width:600px; height:400px; border:none; display:block; margin:0 auto;"></iframe> | |
{st_autoscroll} | |
""", | |
height=400, | |
) | |
) | |
time.sleep(4.5) | |
st.session_state.messages.append( | |
AIMessage("Claim is submitted successfully ✅") | |
) | |
refresh_state() | |
st.session_state.chat_state = ChatStateEnum.DECIDER | |
st.rerun() | |
if st.button("No"): | |
st.session_state.messages.append(AIMessage("Currently not supported")) | |
st.write("Currently not supported") | |
st.rerun() | |
def claim_payout_display(): | |
st.write("We have the following payout method on file, is this correct?") | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
payout = claim_data.get("payout", {}) | |
account = payout.get("account", {}) | |
# receipts = claim_data.get("receipts", []) | |
html_table = f""" | |
<div class="info-card"> | |
<div class="card-title">Payout Details</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Method</div> | |
<div class="value">{payout.get("mode", "UNKNOWN")}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Bank Name</div> | |
<div class="value">{account.get("name", "UNKNOWN")}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Account Holder</div> | |
<div class="value">{account.get("holder", "UNKNOWN")}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Account Number</div> | |
<div class="value">{account.get("account_no", "UNKNOWN")}</div> | |
</div> | |
</div> | |
""" | |
st.markdown(html_table, unsafe_allow_html=True) | |
if st.button("Yes"): | |
st.session_state.messages.append(AIMessage("Payout details confirmed ✅")) | |
st.session_state.messages.append( | |
MessageTable(table=html_table, title="Payout methods") | |
) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_SUBMIT_DISPLAY | |
st.rerun() | |
if st.button("No"): | |
st.session_state.messages.append(AIMessage("Currently not supported")) | |
st.write("Currently not supported") | |
st.rerun() | |
def claim_details_display(): | |
st.write("And the claim details are as follows?") | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
receipts = claim_data.get("receipts", []) | |
claim_type = claim_data.get("claimType", "UNKNOWN") | |
hospital_name = claim_data.get("details", {}).get("hospitalName", "Unknown") | |
receipt_date = receipts[0].get("receiptDate", "Unknown") | |
admission_date = receipts[0].get("admissionDate", "Unknown") | |
discharge_date = receipts[0].get("dischargeDate", "Unknown") | |
if claim_type == ClaimTypeEnum.OUTPATIENT.name: | |
st.markdown( | |
f""" | |
<div class="info-card"> | |
<div class="card-title">Claim Details</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Hospital name</div> | |
<div class="value">{hospital_name}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Visit date</div> | |
<div class="value">{receipt_date}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Diagnosis</div> | |
<div class="value">XXXXXXXXXXXXX</div> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
if claim_type == ClaimTypeEnum.HOSPITALISATION.name: | |
st.markdown( | |
f""" | |
<div class="info-card"> | |
<div class="card-title">Claim Details</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Hospital name</div> | |
<div class="value">{hospital_name}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Admission date</div> | |
<div class="value">{admission_date}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Discharge date</div> | |
<div class="value">{discharge_date}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Diagnosis</div> | |
<div class="value">XXXXXXXXXXXXX</div> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
if st.button("Yes, that's correct"): | |
record_claim_details( | |
claim_type=claim_type, | |
hospital_name=hospital_name, | |
receipt_date=receipt_date, | |
admission_date=admission_date, | |
discharge_date=discharge_date, | |
) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_PAYOUT_DISPLAY | |
st.rerun() | |
if st.button("I need to change"): | |
# st.session_state.messages.append(AIMessage("Currently not supported")) | |
st.write("Currently not supported") | |
st.session_state.chat_state = ChatStateEnum.CLAIM_DETAILS_CHANGE_DISPLAY | |
st.rerun() | |
def claim_details_change_display(): | |
# st.session_state.messages.append(AIMessage("Thanks! We have also established the following details after reviewing the documents")) | |
st.write( | |
"Thanks! We have also established the following details after reviewing the documents" | |
) | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
receipts = claim_data.get("receipts", []) | |
claim_type = claim_data.get("claimType", "UNKNOWN") | |
hospital_name = claim_data.get("details", {}).get("hospitalName", "Unknown") | |
receipt_date = receipts[0].get("receiptDate", None) | |
admission_date = receipts[0].get("admissionDate", None) | |
discharge_date = receipts[0].get("dischargeDate", None) | |
# table_summary_md += f"| Rider | {claim_data.get("claim_type", "Unknown")} |\n" | |
def update_claim_details( | |
changed_hospital_name: str, | |
changed_receipt_date: str | None, | |
changed_admission_date: str | None, | |
changed_discharge_date: str | None, | |
): | |
if "structured_data" in st.session_state and st.session_state.structured_data: | |
structured_data = st.session_state.structured_data | |
claim_data = structured_data.claim_data | |
claim_data["details"]["hospitalName"] = changed_hospital_name | |
claim_data["receipts"][0]["receiptDate"] = changed_receipt_date | |
claim_data["receipts"][0]["admissionDate"] = changed_admission_date | |
claim_data["receipts"][0]["dischargeDate"] = changed_discharge_date | |
st.session_state.structured_data = structured_data | |
else: | |
st.error("Structured data not found in session state.") | |
st.write("Please enter the correct information") | |
with st.form("claim summary"): | |
changed_hospital_name = st.text_input( | |
label="Hospital Name", value=hospital_name | |
) | |
if claim_type == ClaimTypeEnum.OUTPATIENT.name: | |
changed_receipt_date = st.date_input( | |
label="Receipt Date", | |
value=( | |
datetime.strptime(receipt_date, "%Y-%m-%d").date() | |
if receipt_date is not None | |
else None | |
), | |
) | |
changed_receipt_date = ( | |
changed_receipt_date.strftime("%Y-%m-%d") | |
if changed_receipt_date is not None | |
else None | |
) | |
if claim_type == ClaimTypeEnum.HOSPITALISATION.name: | |
changed_admission_date = st.date_input( | |
label="Admission Date", | |
value=( | |
datetime.strptime(admission_date, "%Y-%m-%d").date() | |
if admission_date is not None | |
else None | |
), | |
) | |
changed_admission_date = ( | |
changed_admission_date.strftime("%Y-%m-%d") | |
if changed_admission_date is not None | |
else None | |
) | |
changed_discharged_date = st.date_input( | |
label="Discharge date", | |
value=( | |
datetime.strptime(discharge_date, "%Y-%m-%d").date() | |
if discharge_date is not None | |
else None | |
), | |
) | |
changed_discharged_date = ( | |
changed_discharged_date.strftime("%Y-%m-%d") | |
if changed_admission_date is not None | |
else None | |
) | |
if st.form_submit_button("Submit"): | |
if claim_type == ClaimTypeEnum.OUTPATIENT.name: | |
update_claim_details( | |
changed_hospital_name, changed_receipt_date, None, None | |
) | |
record_claim_details( | |
claim_type=claim_type, | |
hospital_name=changed_hospital_name, | |
receipt_date=changed_receipt_date, | |
admission_date=None, | |
discharge_date=None, | |
) | |
if claim_type == ClaimTypeEnum.HOSPITALISATION.name: | |
update_claim_details( | |
changed_hospital_name, | |
None, | |
changed_admission_date, | |
changed_discharged_date, | |
) | |
record_claim_details( | |
claim_type=claim_type, | |
hospital_name=changed_hospital_name, | |
receipt_date=None, | |
admission_date=changed_admission_date, | |
discharge_date=changed_discharged_date, | |
) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_PAYOUT_DISPLAY | |
st.rerun() | |
def claim_summary_change_display(): | |
# st.session_state.messages.append(AIMessage("Thanks! We have also established the following details after reviewing the documents")) | |
st.write( | |
"Thanks! We have also established the following details after reviewing the documents" | |
) | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
life_assured = claim_data.get("lifeAssured", "Unknown") | |
claim_type = claim_data.get("claimType", "Unknown") | |
st.markdown( | |
f""" | |
<div class="info-card"> | |
<div class="card-title">Summary</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Life Assured</div> | |
<div class="value">{life_assured}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Claim Type</div> | |
<div class="value">{claim_type}</div> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
# table_summary_md += f"| Rider | {claim_data.get("claim_type", "Unknown")} |\n" | |
def update_claim_summary(life_assured: str, claim_type: str): | |
if "structured_data" in st.session_state and st.session_state.structured_data: | |
structured_data = st.session_state.structured_data | |
claim_data = structured_data.claim_data | |
claim_data["lifeAssured"] = life_assured | |
claim_data["claimType"] = claim_type | |
st.session_state.structured_data = structured_data | |
else: | |
st.error("Structured data not found in session state.") | |
st.write("Please enter the correct information") | |
with st.form("claim summary"): | |
changed_life_assured = st.text_input(label="Life assured", value=life_assured) | |
changed_claim_type = st.selectbox( | |
"Choose the correct claim type", | |
(ClaimTypeEnum.HOSPITALISATION.name, ClaimTypeEnum.OUTPATIENT.name), | |
) | |
if st.form_submit_button("Submit"): | |
update_claim_summary(changed_life_assured, changed_claim_type) | |
record_claim_summary(changed_life_assured, changed_claim_type) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_DETAILS_DISPLAY | |
st.rerun() | |
def claim_summary_display(): | |
# st.session_state.messages.append(AIMessage("Thanks! We have also established the following details after reviewing the documents")) | |
st.write( | |
"Thanks! We have also established the following details after reviewing the documents" | |
) | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
life_assured = claim_data.get("lifeAssured", "Unknown") | |
claim_type = claim_data.get("claimType", "Unknown") | |
st.markdown( | |
f""" | |
<div class="info-card"> | |
<div class="card-title">Summary</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Life Assured</div> | |
<div class="value">{life_assured}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Claim Type</div> | |
<div class="value">{claim_type}</div> | |
</div> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
if st.button("Yes, that's correct"): | |
record_claim_summary(life_assured, claim_type) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_DETAILS_DISPLAY | |
st.rerun() | |
if st.button("I need to change"): | |
st.session_state.chat_state = ChatStateEnum.CLAIM_SUMMARY_CHANGE_DISPLAY | |
st.rerun() | |
def claim_amount_change_display(): | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
final_amount = claim_data.get("details", {}).get("finalAmount", 0.0) | |
receipts = claim_data.get("receipts", []) | |
currency = receipts[0].get("currency", {}).get("code", "SGD") | |
summary = ( | |
f"\nFrom the information we are able to obtain from the receipts, we can see that you are looking " | |
f"to claim the following amount:" | |
) | |
st.markdown(summary) | |
st.markdown( | |
f""" <div class="key-metric">{final_amount} {currency}</div>""", | |
unsafe_allow_html=True, | |
) | |
def update_final_amount(new_amount: float): | |
if "structured_data" in st.session_state and st.session_state.structured_data: | |
structured_data = st.session_state.structured_data | |
claim_data = structured_data.claim_data | |
if "details" in claim_data: | |
claim_data["details"]["finalAmount"] = new_amount | |
st.session_state.structured_data = structured_data | |
else: | |
st.error("Details not found in claim data.") | |
else: | |
st.error("Structured data not found in session state.") | |
st.write("Please enter the correct amount") | |
with st.form("claim amount"): | |
amount = st.number_input( | |
label="Correct claim amount", | |
placeholder=st.session_state.claim.details.finalAmount, | |
) | |
if st.form_submit_button("Submit"): | |
st.session_state.claim.details.finalAmount = amount | |
update_final_amount(amount) | |
record_claim_amount(amount, currency) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_SUMMARY_DISPLAY | |
st.rerun() | |
def claim_amount_display(): | |
data = ClaimResponse.model_validate(st.session_state.structured_data) | |
claim_data = data.claim_data | |
final_amount = claim_data.get("details", {}).get("finalAmount", 0.0) | |
receipts = claim_data.get("receipts", []) | |
currency = receipts[0].get("currency", {}).get("code", "SGD") | |
summary = ( | |
f"\nFrom the information we are able to obtain from the receipts, we can see that you are looking " | |
f"to claim the following amount: " | |
) | |
st.markdown(summary) | |
st.markdown( | |
f""" <div class="key-metric">{final_amount} {currency}</div>""", | |
unsafe_allow_html=True, | |
) | |
if st.button("Yes, that's correct"): | |
record_claim_amount(final_amount, currency) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_SUMMARY_DISPLAY | |
st.rerun() | |
if st.button("No, it's a different amount"): | |
st.write("Please input the correct amount") | |
st.session_state.chat_state = ChatStateEnum.CLAIM_AMOUNT_CHANGE_DISPLAY | |
st.rerun() | |
def claim_receipt_upload(): | |
# File upload section | |
if not st.session_state.receipts_processed: | |
st.write( | |
"Please upload a receipt as proof of claim. This will help us process your claim faster." | |
) | |
uploaded_files = st.file_uploader( | |
"Upload receipts", | |
accept_multiple_files=True, | |
type=["jpg", "jpeg", "png", "pdf"], | |
) | |
if uploaded_files and st.button("Process Receipts"): | |
progress_bar = st.progress(0.1, text="Uploading and analyzing documents") | |
# st.info(f"Hold tight while I review the documents...") | |
# Process receipts | |
try: | |
summary, claim_receipts, receipt_results = process_receipts( | |
uploaded_files, progress_bar | |
) | |
# Update session state | |
st.session_state.claim_receipts = claim_receipts | |
st.session_state.receipt_results = receipt_results | |
st.session_state.receipts_processed = True | |
# Update claim with receipts | |
st.session_state.claim.receipts.extend(claim_receipts) | |
# Update claim details with hospital name if available | |
if claim_receipts and claim_receipts[0].hospitalName: | |
st.session_state.claim.details.hospitalName = claim_receipts[ | |
0 | |
].hospitalName | |
st.session_state.claim_summary = summary | |
progress_bar.progress(0.5, "Analysing your claim") | |
run_claim_analysis() | |
progress_bar.progress(1) | |
st.session_state.messages.append( | |
AIMessage("Documents are successfully processed ✅") | |
) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_AMOUNT_DISPLAY | |
st.write("Documents are successfully processed ✅") | |
st.rerun() | |
except ReceiptProcessingError as e: | |
refresh_state() | |
st.session_state.messages.append( | |
AIMessage("There was an error processing receipt, please try again") | |
) | |
st.session_state.chat_state = ChatStateEnum.CLAIM_RECEIPT_UPLOAD | |
st.rerun() | |
finally: | |
import threading | |
# Fire up resoning model | |
background_thread = threading.Thread( | |
target=call_reasoning_model_in_background, | |
args=(receipt_results[0].md_content,), | |
) | |
background_thread.daemon = ( | |
True # Thread will exit when main thread exits | |
) | |
background_thread.start() | |
def decider(): | |
if debug: | |
# Display client profile | |
st.header(f"Client Profile: {st.session_state.claim.clientId}") | |
# Display eligible policies | |
if st.session_state.client_profile: | |
st.markdown(format_policies_info(st.session_state.client_profile)) | |
prompt = st.chat_input() | |
if prompt: | |
# Append the user's message to session state and display it | |
st.session_state.messages.append(HumanMessage(content=prompt)) | |
st.chat_message("user", avatar=path_to_user_image).write(prompt) | |
with st.chat_message("assistant", avatar=path_to_assistant_image): | |
is_claim_question = contains_claim(prompt) | |
if is_claim_question: | |
st.session_state.messages.append( | |
AIMessage("Certainly I can help you, please upload your receipts.") | |
) | |
st.write("Certainly I can help you, please upload your receipts.") | |
st.session_state.chat_state = ChatStateEnum.CLAIM_RECEIPT_UPLOAD | |
else: | |
st.session_state.messages.append( | |
AIMessage( | |
"Currently I am not able to help with this request, I can help you with claims submission!" | |
) | |
) | |
st.write( | |
"Currently I am not able to help with this request, I can help you with claims submission!" | |
) | |
st.session_state.chat_state = ChatStateEnum.DECIDER | |
def run_claim_analysis(): | |
if st.session_state.receipts_processed: | |
# Create a message for the assistant with all the data | |
# Convert Pydantic models to dict | |
policies_data = [ | |
policy.model_dump() for policy in st.session_state.client_profile.policies | |
] | |
currencies_data = st.session_state.currency_response.model_dump() | |
# Handle date serialization in receipts | |
receipts_data = [] | |
for receipt in st.session_state.claim.receipts: | |
receipt_dict = receipt.model_dump() | |
# Convert date objects to strings to avoid JSON serialization issues | |
if isinstance(receipt_dict.get("receiptDate"), date): | |
receipt_dict["receiptDate"] = receipt_dict["receiptDate"].isoformat() | |
if isinstance(receipt_dict.get("admissionDate"), date): | |
receipt_dict["admissionDate"] = receipt_dict[ | |
"admissionDate" | |
].isoformat() | |
if isinstance(receipt_dict.get("dischargeDate"), date): | |
receipt_dict["dischargeDate"] = receipt_dict[ | |
"dischargeDate" | |
].isoformat() | |
receipts_data.append(receipt_dict) | |
claim_extractions_data = [] | |
for claim_extraction in st.session_state.receipt_results: | |
if claim_extraction.md_content: | |
claim_extractions_data.append(claim_extraction.md_content) | |
user_message = f""" | |
# Task | |
Analyze the provided insurance claim data to determine the correct claim type (in-patient or out-patient), select the proper currency, and complete the provided schema using information from various sources. Additionally, explain your reasoning for claim type determination and payout method selection. | |
## Input Data | |
1. Client Profile: | |
{json.dumps(policies_data, indent=2)} | |
3. Receipt Processing Summary: | |
Total claim amount: {st.session_state.claim.details.finalAmount} | |
4. Receipts Provided in the Claim: | |
{json.dumps(receipts_data, indent=2)} | |
5. Receipt Extractions (Markdown Format): | |
{json.dumps(claim_extractions_data, indent=2)} | |
6. Patient name: {receipts_data[0]['patientName']} | |
## Instructions | |
1. Claim Type Determination: | |
a. Analyze the receipt data to decide whether the claim should be classified as hospitalization or outpatient. | |
b. If the receipt does not explicitly indicate the claim type, determine the type based on additional cues (such as the hospital location and associated country). | |
2. Currency Selection: | |
a.Examine the receipt data to identify the currency used. Output the currency code from the given schema. | |
b. Always refer to the hospital location and country to determine the correct currency from this list: {json.dumps(currencies_data, indent=2)} | |
c. Do not associate currency with payout currency | |
3. Schema Completion: | |
a.Populate the schema using: | |
b.Data extracted from the receipts, | |
c.Information from the client profile, and | |
d.Details of the payout method. | |
e.Include any necessary data transformations and validations. | |
5. Explanation and Discrepancies: | |
a.Provide a detailed explanation of how you determined the claim type and selected the payout method. | |
b. ALWAYS highlight and comment on any discrepancies between the Document Extractions and the Client Profile specifically noting any differences in names (patient name vs. policy holder name or any other discrepancies of names). | |
Provide details of those discrepancies. If there are do discrepancies found, do not include them. | |
c.Include a brief analysis (Delta) describing what information was missing or not captured when comparing the "Receipts Provided in the Claim" and "Receipt Extractions". | |
d. Pass the above in the "message" field in the response | |
e. Include a short summary of the claim for user to verify before the submission | |
""" | |
if debug: | |
st.session_state.messages.append(HumanMessage(user_message)) | |
# Run the assistant | |
asyncio.run(run_assistant(user_message)) | |
# Display the formatted message | |
# if st.session_state.structured_data: | |
# formatted_message = format_message(st.session_state.structured_data) | |
# st.markdown(formatted_message) | |
# | |
# # Keep receipt extractions visible | |
# st.header("Receipt Extractions") | |
# for i, result in enumerate(st.session_state.receipt_results): | |
# with st.expander(f"Receipt {i + 1}"): | |
# st.markdown(result.md_content, unsafe_allow_html=True) | |
def record_claim_amount(amount: float, currency: str): | |
st.session_state.messages.append(AIMessage("Claim amount confirmed ✅")) | |
html_table = f""" | |
<div class="key-metric"> | |
{amount} {currency} | |
</div> | |
""" | |
st.session_state.messages.append( | |
MessageTable(table=html_table, title="Final Amount") | |
) | |
def record_claim_summary(life_assured: float, claim_type: str): | |
st.session_state.messages.append(AIMessage("Claim summary confirmed ✅")) | |
html_table = f""" | |
<div class="info-card"> | |
<div class="card-title">Summary</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Life Assured</div> | |
<div class="value">{life_assured}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Claim Type</div> | |
<div class="value">{claim_type}</div> | |
</div> | |
</div> | |
""" | |
st.session_state.messages.append( | |
MessageTable(table=html_table, title="Claim Summary") | |
) | |
def record_claim_details( | |
claim_type: str, | |
hospital_name: str, | |
receipt_date: Optional[str], | |
admission_date: Optional[str], | |
discharge_date: Optional[str], | |
): | |
st.session_state.messages.append(AIMessage("Claim details confirmed ✅")) | |
if claim_type == ClaimTypeEnum.OUTPATIENT.name: | |
html_table = f""" | |
<div class="info-card"> | |
<div class="card-title">Claim Details</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Hospital Name</div> | |
<div class="value">{hospital_name}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Visit Date</div> | |
<div class="value">{receipt_date}</div> | |
</div> | |
</div> | |
""" | |
elif claim_type == ClaimTypeEnum.HOSPITALISATION.name: | |
html_table = f""" | |
<div class="info-card"> | |
<div class="card-title">Details</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Hospital Name</div> | |
<div class="value">{hospital_name}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Admission Date</div> | |
<div class="value">{admission_date}</div> | |
</div> | |
<div class="flex-row card-key-value"> | |
<div class="key">Discharge Date</div> | |
<div class="value">{discharge_date}</div> | |
</div> | |
</div> | |
""" | |
else: | |
st.error("Invalid claim type") | |
return | |
st.session_state.messages.append( | |
MessageTable(table=html_table, title="Claim Details") | |
) | |
if __name__ == "__main__": | |
init_state() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment