Created
June 8, 2025 19:27
-
-
Save jongan69/a4170b1884bee9810691989d16aff813 to your computer and use it in GitHub Desktop.
goose + parser api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
from typing import Dict, Any, Optional, List | |
import subprocess | |
import re | |
import os | |
import json | |
import logging | |
import sys | |
from datetime import datetime | |
from app.services.pdf_parser import extract_invoice_data | |
from app.models.invoice import InvoiceData | |
from app.services.goose_integration import goose_client | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout) | |
] | |
) | |
logger = logging.getLogger("goose-api") | |
# Initialize FastAPI app | |
app = FastAPI( | |
title="Next Invoice Parser API", | |
description="API for extracting structured data from invoice PDFs" | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # In production, restrict to specific origins | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class TaskGuidelines(BaseModel): | |
"""Guidelines for specific tasks.""" | |
credit_assessment: str = """ | |
Analyze the invoice and provide a credit assessment with the following structure: | |
{ | |
"credit_score": number, # 0-1000 | |
"risk_level": string, # "low", "medium", "high" | |
"confidence": number, # 0-100 | |
"factors": [string], # List of factors considered | |
"recommendation": string # Brief recommendation | |
} | |
""" | |
lender_matching: str = """ | |
Analyze the invoice and credit assessment to provide lender matches: | |
{ | |
"matches": [ | |
{ | |
"lender_name": string, | |
"match_score": number, # 0-100 | |
"advance_rate": number, # 0-100 | |
"terms": string, | |
"reasoning": string | |
} | |
], | |
"best_match": string, # Name of best matching lender | |
"confidence": number # 0-100 | |
} | |
""" | |
funding_recommendation: str = """ | |
Provide a funding recommendation based on the invoice and credit assessment: | |
{ | |
"recommended_advance_rate": number, # 0-100 | |
"estimated_funding_time": string, | |
"terms": { | |
"duration": string, | |
"fees": string, | |
"conditions": [string] | |
}, | |
"confidence": number, # 0-100 | |
"rationale": string | |
} | |
""" | |
class GooseInput(BaseModel): | |
instructions: str | |
session_name: str = "api-session" | |
task: Optional[str] = None | |
data: Optional[Dict[str, Any]] = None | |
class GooseResponse(BaseModel): | |
raw_response: str | |
parsed_actions: List[Dict[str, Any]] | |
error: Optional[str] = None | |
shell_command: Optional[str] = None | |
shell_response: Optional[str] = None | |
task_response: Optional[Dict[str, Any]] = None | |
def extract_shell_command(text: str) -> dict: | |
"""Extract shell command and its output from the response.""" | |
command_pattern = r'command: (.*?)(?:\n\n|\n$)' | |
command_match = re.search(command_pattern, text, re.DOTALL) | |
if command_match: | |
command = command_match.group(1).strip() | |
response_text = text[text.find(command) + len(command):].strip() | |
return { | |
"command": command, | |
"response": response_text | |
} | |
return None | |
def clean_goose_output(output: str) -> str: | |
if not output: | |
logger.warning("Received empty output from goose command") | |
return "" | |
logger.debug(f"Raw goose output: {output}") | |
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | |
clean_output = ansi_escape.sub('', output) | |
filtered_lines = [] | |
for line in clean_output.splitlines(): | |
if not any(kw in line.lower() for kw in ["logging to", "working directory", "starting session"]): | |
filtered_lines.append(line.strip()) | |
cleaned = "\n".join(filtered_lines).strip() | |
logger.debug(f"Cleaned output: {cleaned}") | |
return cleaned | |
def parse_goose_response(response: str, task: Optional[str] = None) -> dict: | |
if not response: | |
logger.warning("Received empty response to parse") | |
return { | |
"raw_response": "", | |
"parsed_actions": [], | |
"error": "Empty response received from goose command" | |
} | |
logger.debug(f"Parsing response: {response}") | |
shell_info = extract_shell_command(response) | |
if shell_info: | |
return { | |
"raw_response": response, | |
"parsed_actions": [{ | |
"action": "shell_command", | |
"parameters": shell_info | |
}], | |
"shell_command": shell_info["command"], | |
"shell_response": shell_info["response"] | |
} | |
function_pattern = r'<function=([^{]+){([^}]+)}</function>' | |
matches = re.finditer(function_pattern, response) | |
parsed_actions = [] | |
for match in matches: | |
function_name = match.group(1) | |
try: | |
params = json.loads(match.group(2)) | |
parsed_actions.append({ | |
"action": function_name, | |
"parameters": params | |
}) | |
logger.debug(f"Parsed action: {function_name} with params: {params}") | |
except json.JSONDecodeError as e: | |
logger.warning(f"Failed to parse JSON for function {function_name}: {e}") | |
parsed_actions.append({ | |
"action": function_name, | |
"parameters": match.group(2) | |
}) | |
result = { | |
"raw_response": response, | |
"parsed_actions": parsed_actions | |
} | |
if not parsed_actions: | |
logger.warning("No actions were parsed from the response") | |
result["error"] = "No actions found in response" | |
logger.debug(f"Final parsed result: {json.dumps(result)}") | |
return result | |
@app.post("/run-goose/", response_model=GooseResponse) | |
def run_goose(input: GooseInput): | |
request_id = datetime.now().strftime("%Y%m%d-%H%M%S-%f") | |
logger.info(f"[{request_id}] Received request - Session: {input.session_name}") | |
logger.debug(f"[{request_id}] Instructions: {input.instructions}") | |
try: | |
instruction_content = input.instructions | |
if input.task and hasattr(TaskGuidelines, input.task): | |
task_guidelines = getattr(TaskGuidelines, input.task) | |
instruction_content = f"{input.instructions}\n\nTask Guidelines:\n{task_guidelines}" | |
instruction_file = "/tmp/goose_input.txt" | |
with open(instruction_file, "w") as f: | |
f.write(instruction_content) | |
logger.debug(f"[{request_id}] Wrote instructions to {instruction_file}") | |
env = os.environ.copy() | |
env["GOOSE_NO_KEYRING"] = "1" | |
env["DBUS_SESSION_BUS_ADDRESS"] = "unix:path=/run/user/0/bus" | |
env["RUST_BACKTRACE"] = "1" | |
try: | |
logger.info(f"[{request_id}] Creating/Resuming session: {input.session_name}") | |
session_result = subprocess.run( | |
["goose", "session", "--name", input.session_name], | |
capture_output=True, | |
text=True, | |
env=env | |
) | |
if session_result.returncode != 0: | |
logger.error(f"[{request_id}] Failed to create/resume session: {session_result.stderr}") | |
raise HTTPException(status_code=500, detail=f"Failed to create/resume session: {session_result.stderr}") | |
logger.info(f"[{request_id}] Executing goose command in session") | |
result = subprocess.run( | |
["goose", "run", "-i", instruction_file, "--name", input.session_name, "--debug"], | |
capture_output=True, | |
text=True, | |
env=env | |
) | |
logger.debug(f"[{request_id}] Command return code: {result.returncode}") | |
logger.debug(f"[{request_id}] Command stdout: {result.stdout}") | |
if result.stderr: | |
logger.debug(f"[{request_id}] Command stderr: {result.stderr}") | |
if result.returncode != 0: | |
error_msg = result.stderr if result.stderr else result.stdout | |
logger.error(f"[{request_id}] Goose command failed: {error_msg}") | |
raise HTTPException(status_code=500, detail=error_msg) | |
if not result.stdout: | |
logger.warning(f"[{request_id}] Empty stdout from goose command") | |
return { | |
"raw_response": "", | |
"parsed_actions": [], | |
"error": "No response received from goose command" | |
} | |
cleaned_stdout = clean_goose_output(result.stdout) | |
parsed_response = parse_goose_response(cleaned_stdout, input.task) | |
if input.task: | |
try: | |
json_pattern = r'({[\s\S]*})' | |
json_match = re.search(json_pattern, cleaned_stdout) | |
if json_match: | |
task_response = json.loads(json_match.group(1)) | |
parsed_response["task_response"] = task_response | |
except json.JSONDecodeError: | |
logger.warning(f"[{request_id}] Failed to parse task response as JSON") | |
logger.info(f"[{request_id}] Successfully processed request") | |
return parsed_response | |
except Exception as e: | |
logger.error(f"[{request_id}] Error executing goose command: {str(e)}", exc_info=True) | |
raise HTTPException(status_code=500, detail=str(e)) | |
except Exception as e: | |
logger.error(f"[{request_id}] Error processing request: {str(e)}", exc_info=True) | |
raise HTTPException(status_code=500, detail=str(e)) | |
@app.post("/parse-invoice/", response_model=InvoiceData) | |
async def parse_invoice(file: UploadFile = File(...)): | |
""" | |
Parse an invoice PDF and return structured data. | |
This endpoint accepts a PDF file upload and extracts key invoice details | |
including invoice number, dates, amounts, and parties involved. | |
""" | |
if not file.filename.endswith('.pdf'): | |
raise HTTPException(status_code=400, detail="Only PDF files are supported") | |
try: | |
contents = await file.read() | |
invoice_data = extract_invoice_data(contents) | |
# Goose API integration test | |
response = await goose_client.run_goose_agent( | |
instructions="say hello", | |
session_name="test-session" | |
) | |
print(response) | |
return invoice_data | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error processing invoice: {str(e)}") | |
@app.get("/health") | |
async def health_check(): | |
"""Health check endpoint.""" | |
return {"status": "healthy", "service": "next-invoice-parser"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment