Created
November 9, 2025 13:00
-
-
Save samirsaci/6d0befd09d2d3d69f9d7bbfcd0d20ba6 to your computer and use it in GitHub Desktop.
AI Agents Budget Planning - Budget Interpreter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import requests | |
| from langchain.chat_models import init_chat_model | |
| from app.utils.functions.budagent_runner import run_budget_api | |
| from app.models.budagent_models import LaunchParamsBudget | |
| from app.utils.config_loader import load_config | |
| logger = logging.getLogger(__name__) | |
| config = load_config() | |
| class BudgetPlanInterpreter: | |
| def __init__(self, | |
| model_name: str = "anthropic:claude-3-7-sonnet-latest", | |
| session_id: str = "test_agent"): | |
| self.llm = init_chat_model(model_name) | |
| self.session_id = session_id | |
| self.api_result: dict | None = None | |
| self.html_summary: str | None = None | |
| async def run_plan(self, params: dict) -> dict: | |
| """Run budget planning using FastAPI Microservice API""" | |
| try: | |
| launch_params = LaunchParamsBudget(**params) | |
| self.api_result = await run_budget_api(launch_params, self.session_id) | |
| return self.api_result | |
| except Exception as e: | |
| logger.error(f"[BudgetAgent] Direct API call failed: {e}") | |
| return {"error": str(e)} | |
| async def interpret(self, params: dict, api_output: dict | None = None) -> str: | |
| """Interpret API budget planning results into HTML summary for the Director""" | |
| if api_output is None: | |
| if not self.api_result: | |
| raise ValueError("No API result available.") | |
| api_output = self.api_result | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": config["budget_agent"]["system_prompt_tool"] | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Input parameters: {params}\n | |
| \nModel results: {api_output}" | |
| } | |
| ] | |
| reply = self.llm.invoke(messages) | |
| self.html_summary = reply.content | |
| logger.info("[BudgetPlanAgent] Generated HTML summary") | |
| return self.html_summary | |
| def get_summary(self) -> str: | |
| if not self.html_summary: | |
| raise ValueError("No summary available.") | |
| return self.html_summary |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment