Last active
February 9, 2026 13:41
-
-
Save raoulbia-ai/6bb4f930f60a475fecf6ef82b87afc6d to your computer and use it in GitHub Desktop.
Adds A2A (Agent-to-Agent) protocol support to vanilla Langflow. Required for using the A2A Server Agent component.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Langflow A2A Service Patch | |
| ========================== | |
| This patch adds A2A (Agent-to-Agent) protocol support to vanilla Langflow. | |
| Required for using the A2A Server Agent component. | |
| It provides two things: | |
| 1. SERVICE LAYER - A2A service, factory, schema, and dependency injection | |
| 2. API ROUTER - FastAPI endpoints for agent card discovery and JSON-RPC | |
| Without both, external A2A clients cannot discover or communicate with your agents. | |
| Installation: | |
| 1. Create directory: langflow/services/a2a/ | |
| 2. Add the service files below to that directory | |
| 3. Create the router file: langflow/api/v1/a2a.py | |
| 4. Update langflow/services/schema.py (add ServiceType) | |
| 5. Update langflow/services/deps.py (add get_a2a_service) | |
| 6. Update langflow/api/v1/__init__.py (add router import) | |
| 7. Update langflow/api/router.py (register the router) | |
| 8. Restart Langflow | |
| Requirements: pip install a2a-sdk | |
| Related gists: | |
| - A2A Server Agent: https://gist.github.com/raoulbia-ai/285152d7c789143dfe6b7a848f9dd204 | |
| - A2A Agent Connector: https://gist.github.com/raoulbia-ai/934f1fd83f09638dd2343ad152a96782 | |
| """ | |
| # ============================================================================= | |
| # FILE 1: langflow/services/a2a/__init__.py | |
| # ============================================================================= | |
| __init__py = ''' | |
| """A2A service module for Agent2Agent protocol support.""" | |
| from langflow.services.a2a.factory import A2AServiceFactory | |
| from langflow.services.a2a.service import A2AService, AgentEndpoint | |
| __all__ = ["A2AService", "A2AServiceFactory", "AgentEndpoint"] | |
| ''' | |
| # ============================================================================= | |
| # FILE 2: langflow/services/a2a/service.py | |
| # ============================================================================= | |
| service_py = ''' | |
| """A2A Service for exposing Langflow agents via Agent2Agent Protocol.""" | |
| from __future__ import annotations | |
| from datetime import datetime | |
| from typing import TYPE_CHECKING, Any | |
| from loguru import logger | |
| from langflow.services.base import Service | |
| if TYPE_CHECKING: | |
| from collections.abc import AsyncGenerator | |
| from a2a.types import AgentCard, Message as A2AMessage, Task | |
| try: | |
| from a2a.types import AgentCard, AgentSkill, Task, AgentCapabilities | |
| from a2a.types import Message as A2AMessage | |
| A2A_AVAILABLE = True | |
| except ImportError: | |
| A2A_AVAILABLE = False | |
| logger.warning("A2A SDK not available. Install with: pip install a2a-sdk") | |
| class AgentEndpoint: | |
| """Represents a registered A2A agent endpoint.""" | |
| def __init__( | |
| self, | |
| component_id: str, | |
| flow_id: str, | |
| agent_card: AgentCard, | |
| executor_callable: Any, | |
| ) -> None: | |
| self.component_id = component_id | |
| self.flow_id = flow_id | |
| self.agent_card = agent_card | |
| self.executor_callable = executor_callable | |
| self.created_at = datetime.now() | |
| class A2AService(Service): | |
| """Service for managing A2A protocol endpoints and agent registry.""" | |
| name = "a2a_service" | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self.agent_registry: dict[str, AgentEndpoint] = {} | |
| self.task_store: dict[str, Task] = {} | |
| logger.info("A2A Service initialized") | |
| def register_agent( | |
| self, | |
| component_id: str, | |
| flow_id: str, | |
| agent_card: AgentCard, | |
| executor_callable: Any, | |
| ) -> str: | |
| endpoint = AgentEndpoint( | |
| component_id=component_id, | |
| flow_id=flow_id, | |
| agent_card=agent_card, | |
| executor_callable=executor_callable, | |
| ) | |
| self.agent_registry[component_id] = endpoint | |
| url_path = f"/a2a/{flow_id}/{component_id}" | |
| logger.info(f"Registered A2A agent: {agent_card.name} at {url_path}") | |
| return url_path | |
| def unregister_agent(self, component_id: str) -> None: | |
| if component_id in self.agent_registry: | |
| agent = self.agent_registry[component_id] | |
| logger.info(f"Unregistered A2A agent: {agent.agent_card.name}") | |
| del self.agent_registry[component_id] | |
| def get_agent(self, component_id: str) -> AgentEndpoint | None: | |
| return self.agent_registry.get(component_id) | |
| def get_agent_card(self, component_id: str) -> AgentCard | None: | |
| agent = self.get_agent(component_id) | |
| return agent.agent_card if agent else None | |
| def list_agents(self, flow_id: str | None = None) -> list[AgentEndpoint]: | |
| if flow_id: | |
| return [a for a in self.agent_registry.values() if a.flow_id == flow_id] | |
| return list(self.agent_registry.values()) | |
| async def handle_message_send( | |
| self, | |
| component_id: str, | |
| message: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| agent = self.get_agent(component_id) | |
| if not agent: | |
| raise ValueError(f"Agent not found: {component_id}") | |
| try: | |
| return await agent.executor_callable(message) | |
| except Exception as e: | |
| logger.exception(f"Error executing agent {component_id}: {e}") | |
| raise | |
| def generate_agent_card( | |
| self, | |
| agent_name: str, | |
| description: str, | |
| component_id: str, | |
| flow_id: str, | |
| skills: list[AgentSkill] | None = None, | |
| base_url: str | None = None, | |
| ) -> AgentCard: | |
| if not A2A_AVAILABLE: | |
| raise ImportError("A2A SDK not available") | |
| url = base_url or f"http://localhost:7860/a2a/{flow_id}/{component_id}" | |
| return AgentCard( | |
| name=agent_name, | |
| description=description, | |
| url=url, | |
| version="1.0.0", | |
| defaultInputModes=["text/plain", "application/json"], | |
| defaultOutputModes=["text/plain", "application/json"], | |
| capabilities=AgentCapabilities(streaming=True, pushNotifications=False), | |
| skills=skills or [], | |
| ) | |
| async def teardown(self) -> None: | |
| logger.info("Shutting down A2A service") | |
| self.agent_registry.clear() | |
| self.task_store.clear() | |
| ''' | |
| # ============================================================================= | |
| # FILE 3: langflow/services/a2a/factory.py | |
| # ============================================================================= | |
| factory_py = ''' | |
| """Factory for creating A2A service instances.""" | |
| from typing import TYPE_CHECKING | |
| from typing_extensions import override | |
| from langflow.services.a2a.service import A2AService | |
| from langflow.services.factory import ServiceFactory | |
| if TYPE_CHECKING: | |
| from langflow.services.settings.service import SettingsService | |
| class A2AServiceFactory(ServiceFactory): | |
| """Factory for creating A2A service instances.""" | |
| def __init__(self) -> None: | |
| super().__init__(A2AService) | |
| @override | |
| def create(self, settings_service: "SettingsService") -> A2AService: | |
| return A2AService() | |
| ''' | |
| # ============================================================================= | |
| # PATCH 1: Add to langflow/services/schema.py (in ServiceType enum) | |
| # ============================================================================= | |
| schema_patch = ''' | |
| # Add this line to the ServiceType enum in langflow/services/schema.py: | |
| class ServiceType(str, Enum): | |
| # ... existing entries ... | |
| A2A_SERVICE = "a2a_service" # <-- ADD THIS LINE | |
| ''' | |
| # ============================================================================= | |
| # PATCH 2: Add to langflow/services/deps.py | |
| # ============================================================================= | |
| deps_patch = ''' | |
| # Add this import at the top of langflow/services/deps.py: | |
| from langflow.services.a2a.service import A2AService | |
| # Add this function to langflow/services/deps.py: | |
| def get_a2a_service() -> A2AService: | |
| """Retrieves the A2AService instance from the service manager. | |
| Returns: | |
| A2AService: The A2AService instance. | |
| """ | |
| from langflow.services.a2a.factory import A2AServiceFactory | |
| return get_service(ServiceType.A2A_SERVICE, A2AServiceFactory()) | |
| ''' | |
| # ============================================================================= | |
| # INSTALLATION INSTRUCTIONS | |
| # ============================================================================= | |
| # ============================================================================= | |
| # FILE 4: langflow/api/v1/a2a.py (A2A Protocol Router) | |
| # ============================================================================= | |
| a2a_router_py = ''' | |
| """A2A Protocol Router for Agent2Agent communication. | |
| This router implements the A2A protocol endpoints: | |
| - GET /.well-known/agent.json - Agent card discovery | |
| - POST /rpc - JSON-RPC 2.0 endpoint for protocol methods | |
| """ | |
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Any | |
| from fastapi import APIRouter, HTTPException, Path | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from loguru import logger | |
| from pydantic import BaseModel | |
| from langflow.services.deps import get_a2a_service | |
| if TYPE_CHECKING: | |
| from langflow.services.a2a.service import A2AService | |
| router = APIRouter(prefix="/a2a", tags=["A2A Protocol"]) | |
| class JsonRpcRequest(BaseModel): | |
| """JSON-RPC 2.0 request model.""" | |
| jsonrpc: str = "2.0" | |
| method: str | |
| params: dict[str, Any] | None = None | |
| id: str | int | None = None | |
| class JsonRpcResponse(BaseModel): | |
| """JSON-RPC 2.0 response model.""" | |
| jsonrpc: str = "2.0" | |
| result: Any | None = None | |
| error: dict[str, Any] | None = None | |
| id: str | int | None = None | |
| class JsonRpcError(BaseModel): | |
| """JSON-RPC 2.0 error model.""" | |
| code: int | |
| message: str | |
| data: Any | None = None | |
| @router.get("/{flow_id}/{component_id}/.well-known/agent.json") | |
| async def get_agent_card( | |
| flow_id: str = Path(..., description="Flow ID"), | |
| component_id: str = Path(..., description="Component ID"), | |
| ) -> JSONResponse: | |
| """Get the agent card for a specific agent component.""" | |
| a2a_service: A2AService = get_a2a_service() | |
| agent_card = a2a_service.get_agent_card(component_id) | |
| if not agent_card: | |
| raise HTTPException(status_code=404, detail=f"Agent not found: {component_id}") | |
| return JSONResponse(content=agent_card.model_dump(exclude_none=True)) | |
| @router.post("/{flow_id}/{component_id}/rpc", response_model=None) | |
| async def json_rpc_endpoint( | |
| request: JsonRpcRequest, | |
| flow_id: str = Path(..., description="Flow ID"), | |
| component_id: str = Path(..., description="Component ID"), | |
| ) -> JSONResponse | StreamingResponse: | |
| """JSON-RPC 2.0 endpoint for A2A protocol methods. | |
| Supported methods: | |
| - message/send: Send a message synchronously | |
| - message/stream: Send a message with streaming | |
| - tasks/get: Get task status | |
| - tasks/cancel: Cancel a running task | |
| - tasks/list: List tasks | |
| """ | |
| a2a_service: A2AService = get_a2a_service() | |
| try: | |
| if request.jsonrpc != "2.0": | |
| return _create_error_response(-32600, "Invalid Request", | |
| "JSON-RPC version must be 2.0", request.id) | |
| if request.method == "message/send": | |
| return await _handle_message_send(a2a_service, component_id, | |
| request.params or {}, request.id) | |
| if request.method == "message/stream": | |
| return await _handle_message_stream(a2a_service, component_id, | |
| request.params or {}, request.id) | |
| if request.method == "tasks/get": | |
| return await _handle_tasks_get(a2a_service, request.params or {}, request.id) | |
| if request.method == "tasks/list": | |
| return await _handle_tasks_list(a2a_service, request.params or {}, request.id) | |
| if request.method == "tasks/cancel": | |
| return await _handle_tasks_cancel(a2a_service, request.params or {}, request.id) | |
| return _create_error_response(-32601, "Method not found", | |
| f"Method \\'{request.method}\\' is not supported", request.id) | |
| except Exception as e: | |
| logger.exception(f"Error handling JSON-RPC request: {e}") | |
| return _create_error_response(-32603, "Internal error", str(e), request.id) | |
| async def _handle_message_send(a2a_service, component_id, params, request_id): | |
| """Handle message/send JSON-RPC method.""" | |
| message = params.get("message") | |
| if not message: | |
| return _create_error_response(-32602, "Invalid params", | |
| "Missing required parameter: message", request_id) | |
| try: | |
| result = await a2a_service.handle_message_send(component_id, message) | |
| return JSONResponse(content=JsonRpcResponse( | |
| jsonrpc="2.0", result=result, id=request_id | |
| ).model_dump(exclude_none=True)) | |
| except ValueError as e: | |
| return _create_error_response(-32602, "Invalid params", str(e), request_id) | |
| except Exception as e: | |
| logger.exception("Error in message/send") | |
| return _create_error_response(-32603, "Internal error", str(e), request_id) | |
| async def _handle_message_stream(a2a_service, component_id, params, request_id): | |
| """Handle message/stream JSON-RPC method with SSE.""" | |
| message = params.get("message") | |
| if not message: | |
| return JSONResponse( | |
| content=_create_error_response(-32602, "Invalid params", | |
| "Missing required parameter: message", | |
| request_id).model_dump(exclude_none=True), | |
| status_code=400, | |
| ) | |
| async def event_generator(): | |
| try: | |
| async for event in a2a_service.handle_message_stream(component_id, message): | |
| notification = { | |
| "jsonrpc": "2.0", | |
| "method": "message/stream", | |
| "params": {"event": event}, | |
| } | |
| yield f"data: {notification}\\n\\n" | |
| except Exception as e: | |
| logger.exception("Error in message/stream") | |
| error_event = { | |
| "jsonrpc": "2.0", | |
| "error": {"code": -32603, "message": "Internal error", "data": str(e)}, | |
| "id": request_id, | |
| } | |
| yield f"data: {error_event}\\n\\n" | |
| return StreamingResponse( | |
| event_generator(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no"}, | |
| ) | |
| async def _handle_tasks_get(a2a_service, params, request_id): | |
| """Handle tasks/get JSON-RPC method.""" | |
| task_id = params.get("taskId") | |
| if not task_id: | |
| return _create_error_response(-32602, "Invalid params", | |
| "Missing required parameter: taskId", request_id) | |
| task = a2a_service.get_task(task_id) | |
| if not task: | |
| return _create_error_response(-32602, "Task not found", | |
| f"Task not found: {task_id}", request_id) | |
| return JSONResponse(content=JsonRpcResponse( | |
| jsonrpc="2.0", | |
| result=task.model_dump(exclude_none=True) if hasattr(task, "model_dump") else task, | |
| id=request_id, | |
| ).model_dump(exclude_none=True)) | |
| async def _handle_tasks_list(a2a_service, params, request_id): | |
| """Handle tasks/list JSON-RPC method.""" | |
| context_id = params.get("contextId") | |
| tasks = a2a_service.list_tasks(context_id=context_id) | |
| task_dicts = [t.model_dump(exclude_none=True) if hasattr(t, "model_dump") else t for t in tasks] | |
| return JSONResponse(content=JsonRpcResponse( | |
| jsonrpc="2.0", result={"tasks": task_dicts}, id=request_id, | |
| ).model_dump(exclude_none=True)) | |
| async def _handle_tasks_cancel(a2a_service, params, request_id): | |
| """Handle tasks/cancel JSON-RPC method.""" | |
| task_id = params.get("taskId") | |
| if not task_id: | |
| return _create_error_response(-32602, "Invalid params", | |
| "Missing required parameter: taskId", request_id) | |
| success = a2a_service.cancel_task(task_id) | |
| return JSONResponse(content=JsonRpcResponse( | |
| jsonrpc="2.0", result={"canceled": success}, id=request_id, | |
| ).model_dump(exclude_none=True)) | |
| def _create_error_response(code, message, data=None, request_id=None): | |
| """Create a JSON-RPC error response.""" | |
| return JsonRpcResponse( | |
| jsonrpc="2.0", | |
| error=JsonRpcError(code=code, message=message, data=data).model_dump(exclude_none=True), | |
| id=request_id, | |
| ) | |
| ''' | |
| # ============================================================================= | |
| # PATCH 3: Update langflow/api/v1/__init__.py (add router import) | |
| # ============================================================================= | |
| init_router_patch = ''' | |
| # Add this import line to langflow/api/v1/__init__.py: | |
| from langflow.api.v1.a2a import router as a2a_router # <-- ADD THIS LINE | |
| # And add "a2a_router" to the __all__ list | |
| ''' | |
| # ============================================================================= | |
| # PATCH 4: Update langflow/api/router.py (register the router) | |
| # ============================================================================= | |
| router_registration_patch = ''' | |
| # In langflow/api/router.py: | |
| # 1. Add a2a_router to the imports: | |
| from langflow.api.v1 import ( | |
| a2a_router, # <-- ADD THIS LINE | |
| # ... existing imports ... | |
| ) | |
| # 2. Register the router (add this line alongside the other router.include_router calls): | |
| router_v1.include_router(a2a_router) # <-- ADD THIS LINE | |
| ''' | |
| # ============================================================================= | |
| # INSTALLATION INSTRUCTIONS | |
| # ============================================================================= | |
| instructions = ''' | |
| INSTALLATION INSTRUCTIONS | |
| ========================= | |
| 1. Locate your Langflow installation's source code: | |
| - If installed via pip: find with `python -c "import langflow; print(langflow.__file__)"` | |
| - If running from source: navigate to src/backend/base/langflow/ | |
| 2. Create the A2A service directory and add service files: | |
| mkdir -p langflow/services/a2a/ | |
| - __init__.py (copy content from __init__py variable above) | |
| - service.py (copy content from service_py variable above) | |
| - factory.py (copy content from factory_py variable above) | |
| 3. Create the A2A router file: | |
| - Create langflow/api/v1/a2a.py (copy content from a2a_router_py variable above) | |
| 4. Edit langflow/services/schema.py: | |
| - Find the ServiceType enum | |
| - Add: A2A_SERVICE = "a2a_service" | |
| 5. Edit langflow/services/deps.py: | |
| - Add import: from langflow.services.a2a.service import A2AService | |
| - Add the get_a2a_service() function (see deps_patch above) | |
| 6. Edit langflow/api/v1/__init__.py: | |
| - Add: from langflow.api.v1.a2a import router as a2a_router | |
| - Add "a2a_router" to the __all__ list | |
| 7. Edit langflow/api/router.py: | |
| - Add a2a_router to imports from langflow.api.v1 | |
| - Add: router_v1.include_router(a2a_router) | |
| 8. Install the A2A SDK: | |
| pip install a2a-sdk | |
| 9. Restart Langflow | |
| VERIFICATION | |
| ============ | |
| After restarting: | |
| - The A2A Server Agent component should work without import errors | |
| - Agent card discovery: GET /api/v1/a2a/{flow_id}/{component_id}/.well-known/agent.json | |
| - JSON-RPC endpoint: POST /api/v1/a2a/{flow_id}/{component_id}/rpc | |
| Supported JSON-RPC methods: message/send, message/stream, tasks/get, tasks/list, tasks/cancel | |
| ''' | |
| if __name__ == "__main__": | |
| print(instructions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment