Created
December 2, 2025 10:25
-
-
Save bikidsx/0df0b7ed551d2902a5db229bc5d73bcd to your computer and use it in GitHub Desktop.
Vercel AI SDK Adapte for AgnoAI Framework
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Vercel AI SDK Adapter for Agno | |
| Converts Agno agent/team streams to Vercel AI SDK Data V5 Stream Protocol (SSE) | |
| Inspired by @https://github.com/gauravdhiman/vercel-agno-integration/blob/main/server/agno_adapter.py | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import AsyncGenerator, Any, Dict, List, Optional | |
| from uuid import uuid4 | |
| from agno.agent import ( | |
| Agent, | |
| RunContentEvent, | |
| RunStartedEvent, | |
| RunCompletedEvent, | |
| RunErrorEvent, | |
| ToolCallStartedEvent, | |
| ToolCallCompletedEvent, | |
| RunOutputEvent, | |
| ) | |
| from agno.team import Team | |
| from agno.run.team import TeamRunEvent | |
| from agno.models.message import Message as AgnoMessage | |
| logger = logging.getLogger(__name__) | |
| # Enable debug logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| class VercelAIAdapter: | |
| """ | |
| Adapts Agno Agent/Team output streams to Vercel AI SDK Data Stream Protocol. | |
| Supports SSE streaming with proper event formatting. | |
| """ | |
| def __init__(self, agent_or_team): | |
| """ | |
| Initialize adapter with an Agno Agent or Team. | |
| Args: | |
| agent_or_team: Agno Agent or Team instance | |
| """ | |
| if not isinstance(agent_or_team, (Agent, Team)): | |
| raise TypeError("Input must be an instance of agno.Agent or agno.Team") | |
| self.executor = agent_or_team | |
| self.name = agent_or_team.name | |
| self._processed_tool_calls = set() | |
| @staticmethod | |
| def _format_sse_event(event_type: str, data: Any) -> str: | |
| """ | |
| Format data as Server-Sent Event according to Vercel AI SDK protocol. | |
| Args: | |
| event_type: Type of event (text-delta, tool-input-start, etc.) | |
| data: Event data to serialize | |
| Returns: | |
| Formatted SSE string | |
| """ | |
| try: | |
| if isinstance(data, str): | |
| payload = json.dumps(data) | |
| else: | |
| payload = json.dumps(data, default=str) | |
| except TypeError as e: | |
| logger.error(f"Serialization error for {event_type}: {e}") | |
| payload = json.dumps({"error": "Serialization failed"}) | |
| event_type = "error" | |
| # Vercel AI SDK Data Stream Protocol format | |
| return f"data: {json.dumps({'type': event_type, **json.loads(payload)})}\n\n" | |
| async def _agno_to_vercel_stream( | |
| self, | |
| agno_response_stream: AsyncGenerator[RunOutputEvent, None] | |
| ) -> AsyncGenerator[bytes, None]: | |
| """ | |
| Convert Agno stream events to Vercel AI SDK format. | |
| Args: | |
| agno_response_stream: Async generator from Agno agent/team | |
| Yields: | |
| SSE-formatted bytes | |
| """ | |
| message_id = f"msg_{uuid4().hex}" | |
| text_block_id = None | |
| try: | |
| # Send message start event | |
| yield self._format_sse_event("start", {"messageId": message_id}).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| async for agno_response in agno_response_stream: | |
| # Debug: log event type | |
| logger.debug(f"Received event: {type(agno_response).__name__}") | |
| # Check for content attribute (works for both Agent and Team events) | |
| content = getattr(agno_response, 'content', None) | |
| event_attr = getattr(agno_response, 'event', None) | |
| # Handle content events (Team or Agent) | |
| if (event_attr == TeamRunEvent.run_content or isinstance(agno_response, RunContentEvent)) and content: | |
| # Start text block if not started | |
| if not text_block_id: | |
| text_block_id = f"text_{uuid4().hex}" | |
| yield self._format_sse_event("text-start", {"id": text_block_id}).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Send text delta | |
| yield self._format_sse_event("text-delta", { | |
| "id": text_block_id, | |
| "delta": content | |
| }).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Handle tool call started (Team or Agent) | |
| elif event_attr == TeamRunEvent.tool_call_started or isinstance(agno_response, ToolCallStartedEvent): | |
| tool = getattr(agno_response, 'tool', None) | |
| if tool: | |
| tool_call_id = getattr(tool, 'tool_call_id', None) or f"tool_{uuid4().hex}" | |
| if tool_call_id not in self._processed_tool_calls: | |
| self._processed_tool_calls.add(tool_call_id) | |
| tool_name = getattr(tool, 'tool_name', 'unknown') | |
| tool_args = getattr(tool, 'tool_args', {}) | |
| # Send tool input start | |
| yield self._format_sse_event("tool-input-start", { | |
| "toolCallId": tool_call_id, | |
| "toolName": tool_name | |
| }).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Send tool input available | |
| yield self._format_sse_event("tool-input-available", { | |
| "toolCallId": tool_call_id, | |
| "toolName": tool_name, | |
| "input": tool_args if isinstance(tool_args, dict) else {} | |
| }).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Handle tool completion (Team or Agent) | |
| elif event_attr == TeamRunEvent.tool_call_completed or isinstance(agno_response, ToolCallCompletedEvent): | |
| tool = getattr(agno_response, 'tool', None) | |
| if tool: | |
| tool_call_id = getattr(tool, 'tool_call_id', None) or f"tool_{uuid4().hex}" | |
| tool_result = getattr(tool, 'result', None) | |
| if tool_result: | |
| yield self._format_sse_event("tool-output-available", { | |
| "toolCallId": tool_call_id, | |
| "output": str(tool_result) | |
| }).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Handle errors (Team or Agent) | |
| elif event_attr == TeamRunEvent.run_error or isinstance(agno_response, RunErrorEvent): | |
| error_msg = getattr(agno_response, 'error', 'Unknown error') | |
| yield self._format_sse_event("error", { | |
| "errorText": str(error_msg) | |
| }).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Handle completion (Team or Agent) | |
| elif event_attr == TeamRunEvent.run_completed or isinstance(agno_response, RunCompletedEvent): | |
| # End text block if started | |
| if text_block_id: | |
| yield self._format_sse_event("text-end", {"id": text_block_id}).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Send finish message with usage | |
| finish_data = {"finishReason": "stop"} | |
| metrics = getattr(agno_response, 'metrics', None) | |
| if metrics: | |
| finish_data["usage"] = { | |
| "promptTokens": int(getattr(metrics, 'input_tokens', 0)), | |
| "completionTokens": int(getattr(metrics, 'output_tokens', 0)), | |
| } | |
| finish_data["usage"]["totalTokens"] = ( | |
| finish_data["usage"]["promptTokens"] + | |
| finish_data["usage"]["completionTokens"] | |
| ) | |
| yield self._format_sse_event("finish", finish_data).encode("utf-8") | |
| await asyncio.sleep(0.01) | |
| # Send stream termination | |
| yield b"data: [DONE]\n\n" | |
| except Exception as e: | |
| logger.error(f"Error in stream: {e}", exc_info=True) | |
| # Send error event | |
| yield self._format_sse_event("error", { | |
| "errorText": "An error occurred processing your request." | |
| }).encode("utf-8") | |
| # Send finish event | |
| yield self._format_sse_event("finish", {"finishReason": "error"}).encode("utf-8") | |
| yield b"data: [DONE]\n\n" | |
| def _prepare_agno_messages(self, messages: List[Dict[str, Any]]) -> List[AgnoMessage]: | |
| """ | |
| Convert Vercel AI SDK message format to Agno messages. | |
| Args: | |
| messages: List of messages from Vercel AI SDK | |
| Returns: | |
| List of Agno Message objects | |
| """ | |
| agno_messages = [] | |
| for msg in messages: | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| if content: | |
| agno_messages.append(AgnoMessage( | |
| role=role, | |
| content=str(content) | |
| )) | |
| # Handle tool invocations (results from frontend) | |
| if "toolInvocations" in msg: | |
| for tool_inv in msg.get("toolInvocations", []): | |
| if tool_inv.get("state") == "result": | |
| tool_name = tool_inv.get("toolName") | |
| tool_result = tool_inv.get("result", {}) | |
| agno_messages.append(AgnoMessage( | |
| role="user", | |
| content=f"Tool '{tool_name}' returned: {json.dumps(tool_result)}" | |
| )) | |
| return agno_messages | |
| async def stream_response( | |
| self, | |
| messages: List[Dict[str, Any]], | |
| session_id: Optional[str] = None, | |
| user_id: Optional[str] = None, | |
| **kwargs: Any | |
| ) -> AsyncGenerator[bytes, None]: | |
| """ | |
| Main entry point for streaming responses. | |
| Args: | |
| messages: Full message history from Vercel AI SDK | |
| session_id: Optional session ID for context | |
| user_id: Optional user ID | |
| **kwargs: Additional arguments for agent/team run | |
| Yields: | |
| SSE-formatted bytes | |
| """ | |
| # Reset tool tracking for new conversation | |
| self._processed_tool_calls = set() | |
| # Convert messages | |
| agno_messages = self._prepare_agno_messages(messages) | |
| if not agno_messages: | |
| return | |
| # Latest message is the user input | |
| user_input = agno_messages[-1] | |
| # Previous messages are context | |
| if len(agno_messages) > 1: | |
| self.executor.add_messages = agno_messages[:-1] | |
| # Start streaming from Agno (returns async generator directly in v2) | |
| agno_stream = self.executor.arun( | |
| input=user_input, # Changed from 'message' to 'input' for v2 compatibility | |
| session_id=session_id, | |
| user_id=user_id, | |
| stream=True, | |
| stream_events=True, # Changed from stream_intermediate_steps | |
| **kwargs | |
| ) | |
| # Convert and yield | |
| async for chunk in self._agno_to_vercel_stream(agno_stream): | |
| yield chunk |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment