Skip to content

Instantly share code, notes, and snippets.

@raoulbia-ai
Last active February 9, 2026 13:41
Show Gist options
  • Select an option

  • Save raoulbia-ai/6bb4f930f60a475fecf6ef82b87afc6d to your computer and use it in GitHub Desktop.

Select an option

Save raoulbia-ai/6bb4f930f60a475fecf6ef82b87afc6d to your computer and use it in GitHub Desktop.
Adds A2A (Agent-to-Agent) protocol support to vanilla Langflow. Required for using the A2A Server Agent component.
"""
Langflow A2A Service Patch
==========================
This patch adds A2A (Agent-to-Agent) protocol support to vanilla Langflow.
Required for using the A2A Server Agent component.
It provides two things:
1. SERVICE LAYER - A2A service, factory, schema, and dependency injection
2. API ROUTER - FastAPI endpoints for agent card discovery and JSON-RPC
Without both, external A2A clients cannot discover or communicate with your agents.
Installation:
1. Create directory: langflow/services/a2a/
2. Add the service files below to that directory
3. Create the router file: langflow/api/v1/a2a.py
4. Update langflow/services/schema.py (add ServiceType)
5. Update langflow/services/deps.py (add get_a2a_service)
6. Update langflow/api/v1/__init__.py (add router import)
7. Update langflow/api/router.py (register the router)
8. Restart Langflow
Requirements: pip install a2a-sdk
Related gists:
- A2A Server Agent: https://gist.github.com/raoulbia-ai/285152d7c789143dfe6b7a848f9dd204
- A2A Agent Connector: https://gist.github.com/raoulbia-ai/934f1fd83f09638dd2343ad152a96782
"""
# =============================================================================
# FILE 1: langflow/services/a2a/__init__.py
# =============================================================================
__init__py = '''
"""A2A service module for Agent2Agent protocol support."""
from langflow.services.a2a.factory import A2AServiceFactory
from langflow.services.a2a.service import A2AService, AgentEndpoint
__all__ = ["A2AService", "A2AServiceFactory", "AgentEndpoint"]
'''
# =============================================================================
# FILE 2: langflow/services/a2a/service.py
# =============================================================================
service_py = '''
"""A2A Service for exposing Langflow agents via Agent2Agent Protocol."""
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from loguru import logger
from langflow.services.base import Service
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from a2a.types import AgentCard, Message as A2AMessage, Task
try:
from a2a.types import AgentCard, AgentSkill, Task, AgentCapabilities
from a2a.types import Message as A2AMessage
A2A_AVAILABLE = True
except ImportError:
A2A_AVAILABLE = False
logger.warning("A2A SDK not available. Install with: pip install a2a-sdk")
class AgentEndpoint:
"""Represents a registered A2A agent endpoint."""
def __init__(
self,
component_id: str,
flow_id: str,
agent_card: AgentCard,
executor_callable: Any,
) -> None:
self.component_id = component_id
self.flow_id = flow_id
self.agent_card = agent_card
self.executor_callable = executor_callable
self.created_at = datetime.now()
class A2AService(Service):
"""Service for managing A2A protocol endpoints and agent registry."""
name = "a2a_service"
def __init__(self) -> None:
super().__init__()
self.agent_registry: dict[str, AgentEndpoint] = {}
self.task_store: dict[str, Task] = {}
logger.info("A2A Service initialized")
def register_agent(
self,
component_id: str,
flow_id: str,
agent_card: AgentCard,
executor_callable: Any,
) -> str:
endpoint = AgentEndpoint(
component_id=component_id,
flow_id=flow_id,
agent_card=agent_card,
executor_callable=executor_callable,
)
self.agent_registry[component_id] = endpoint
url_path = f"/a2a/{flow_id}/{component_id}"
logger.info(f"Registered A2A agent: {agent_card.name} at {url_path}")
return url_path
def unregister_agent(self, component_id: str) -> None:
if component_id in self.agent_registry:
agent = self.agent_registry[component_id]
logger.info(f"Unregistered A2A agent: {agent.agent_card.name}")
del self.agent_registry[component_id]
def get_agent(self, component_id: str) -> AgentEndpoint | None:
return self.agent_registry.get(component_id)
def get_agent_card(self, component_id: str) -> AgentCard | None:
agent = self.get_agent(component_id)
return agent.agent_card if agent else None
def list_agents(self, flow_id: str | None = None) -> list[AgentEndpoint]:
if flow_id:
return [a for a in self.agent_registry.values() if a.flow_id == flow_id]
return list(self.agent_registry.values())
async def handle_message_send(
self,
component_id: str,
message: dict[str, Any],
) -> dict[str, Any]:
agent = self.get_agent(component_id)
if not agent:
raise ValueError(f"Agent not found: {component_id}")
try:
return await agent.executor_callable(message)
except Exception as e:
logger.exception(f"Error executing agent {component_id}: {e}")
raise
def generate_agent_card(
self,
agent_name: str,
description: str,
component_id: str,
flow_id: str,
skills: list[AgentSkill] | None = None,
base_url: str | None = None,
) -> AgentCard:
if not A2A_AVAILABLE:
raise ImportError("A2A SDK not available")
url = base_url or f"http://localhost:7860/a2a/{flow_id}/{component_id}"
return AgentCard(
name=agent_name,
description=description,
url=url,
version="1.0.0",
defaultInputModes=["text/plain", "application/json"],
defaultOutputModes=["text/plain", "application/json"],
capabilities=AgentCapabilities(streaming=True, pushNotifications=False),
skills=skills or [],
)
async def teardown(self) -> None:
logger.info("Shutting down A2A service")
self.agent_registry.clear()
self.task_store.clear()
'''
# =============================================================================
# FILE 3: langflow/services/a2a/factory.py
# =============================================================================
factory_py = '''
"""Factory for creating A2A service instances."""
from typing import TYPE_CHECKING
from typing_extensions import override
from langflow.services.a2a.service import A2AService
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class A2AServiceFactory(ServiceFactory):
"""Factory for creating A2A service instances."""
def __init__(self) -> None:
super().__init__(A2AService)
@override
def create(self, settings_service: "SettingsService") -> A2AService:
return A2AService()
'''
# =============================================================================
# PATCH 1: Add to langflow/services/schema.py (in ServiceType enum)
# =============================================================================
schema_patch = '''
# Add this line to the ServiceType enum in langflow/services/schema.py:
class ServiceType(str, Enum):
# ... existing entries ...
A2A_SERVICE = "a2a_service" # <-- ADD THIS LINE
'''
# =============================================================================
# PATCH 2: Add to langflow/services/deps.py
# =============================================================================
deps_patch = '''
# Add this import at the top of langflow/services/deps.py:
from langflow.services.a2a.service import A2AService
# Add this function to langflow/services/deps.py:
def get_a2a_service() -> A2AService:
"""Retrieves the A2AService instance from the service manager.
Returns:
A2AService: The A2AService instance.
"""
from langflow.services.a2a.factory import A2AServiceFactory
return get_service(ServiceType.A2A_SERVICE, A2AServiceFactory())
'''
# =============================================================================
# INSTALLATION INSTRUCTIONS
# =============================================================================
# =============================================================================
# FILE 4: langflow/api/v1/a2a.py (A2A Protocol Router)
# =============================================================================
a2a_router_py = '''
"""A2A Protocol Router for Agent2Agent communication.
This router implements the A2A protocol endpoints:
- GET /.well-known/agent.json - Agent card discovery
- POST /rpc - JSON-RPC 2.0 endpoint for protocol methods
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from fastapi import APIRouter, HTTPException, Path
from fastapi.responses import JSONResponse, StreamingResponse
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_a2a_service
if TYPE_CHECKING:
from langflow.services.a2a.service import A2AService
router = APIRouter(prefix="/a2a", tags=["A2A Protocol"])
class JsonRpcRequest(BaseModel):
"""JSON-RPC 2.0 request model."""
jsonrpc: str = "2.0"
method: str
params: dict[str, Any] | None = None
id: str | int | None = None
class JsonRpcResponse(BaseModel):
"""JSON-RPC 2.0 response model."""
jsonrpc: str = "2.0"
result: Any | None = None
error: dict[str, Any] | None = None
id: str | int | None = None
class JsonRpcError(BaseModel):
"""JSON-RPC 2.0 error model."""
code: int
message: str
data: Any | None = None
@router.get("/{flow_id}/{component_id}/.well-known/agent.json")
async def get_agent_card(
flow_id: str = Path(..., description="Flow ID"),
component_id: str = Path(..., description="Component ID"),
) -> JSONResponse:
"""Get the agent card for a specific agent component."""
a2a_service: A2AService = get_a2a_service()
agent_card = a2a_service.get_agent_card(component_id)
if not agent_card:
raise HTTPException(status_code=404, detail=f"Agent not found: {component_id}")
return JSONResponse(content=agent_card.model_dump(exclude_none=True))
@router.post("/{flow_id}/{component_id}/rpc", response_model=None)
async def json_rpc_endpoint(
request: JsonRpcRequest,
flow_id: str = Path(..., description="Flow ID"),
component_id: str = Path(..., description="Component ID"),
) -> JSONResponse | StreamingResponse:
"""JSON-RPC 2.0 endpoint for A2A protocol methods.
Supported methods:
- message/send: Send a message synchronously
- message/stream: Send a message with streaming
- tasks/get: Get task status
- tasks/cancel: Cancel a running task
- tasks/list: List tasks
"""
a2a_service: A2AService = get_a2a_service()
try:
if request.jsonrpc != "2.0":
return _create_error_response(-32600, "Invalid Request",
"JSON-RPC version must be 2.0", request.id)
if request.method == "message/send":
return await _handle_message_send(a2a_service, component_id,
request.params or {}, request.id)
if request.method == "message/stream":
return await _handle_message_stream(a2a_service, component_id,
request.params or {}, request.id)
if request.method == "tasks/get":
return await _handle_tasks_get(a2a_service, request.params or {}, request.id)
if request.method == "tasks/list":
return await _handle_tasks_list(a2a_service, request.params or {}, request.id)
if request.method == "tasks/cancel":
return await _handle_tasks_cancel(a2a_service, request.params or {}, request.id)
return _create_error_response(-32601, "Method not found",
f"Method \\'{request.method}\\' is not supported", request.id)
except Exception as e:
logger.exception(f"Error handling JSON-RPC request: {e}")
return _create_error_response(-32603, "Internal error", str(e), request.id)
async def _handle_message_send(a2a_service, component_id, params, request_id):
"""Handle message/send JSON-RPC method."""
message = params.get("message")
if not message:
return _create_error_response(-32602, "Invalid params",
"Missing required parameter: message", request_id)
try:
result = await a2a_service.handle_message_send(component_id, message)
return JSONResponse(content=JsonRpcResponse(
jsonrpc="2.0", result=result, id=request_id
).model_dump(exclude_none=True))
except ValueError as e:
return _create_error_response(-32602, "Invalid params", str(e), request_id)
except Exception as e:
logger.exception("Error in message/send")
return _create_error_response(-32603, "Internal error", str(e), request_id)
async def _handle_message_stream(a2a_service, component_id, params, request_id):
"""Handle message/stream JSON-RPC method with SSE."""
message = params.get("message")
if not message:
return JSONResponse(
content=_create_error_response(-32602, "Invalid params",
"Missing required parameter: message",
request_id).model_dump(exclude_none=True),
status_code=400,
)
async def event_generator():
try:
async for event in a2a_service.handle_message_stream(component_id, message):
notification = {
"jsonrpc": "2.0",
"method": "message/stream",
"params": {"event": event},
}
yield f"data: {notification}\\n\\n"
except Exception as e:
logger.exception("Error in message/stream")
error_event = {
"jsonrpc": "2.0",
"error": {"code": -32603, "message": "Internal error", "data": str(e)},
"id": request_id,
}
yield f"data: {error_event}\\n\\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive",
"X-Accel-Buffering": "no"},
)
async def _handle_tasks_get(a2a_service, params, request_id):
"""Handle tasks/get JSON-RPC method."""
task_id = params.get("taskId")
if not task_id:
return _create_error_response(-32602, "Invalid params",
"Missing required parameter: taskId", request_id)
task = a2a_service.get_task(task_id)
if not task:
return _create_error_response(-32602, "Task not found",
f"Task not found: {task_id}", request_id)
return JSONResponse(content=JsonRpcResponse(
jsonrpc="2.0",
result=task.model_dump(exclude_none=True) if hasattr(task, "model_dump") else task,
id=request_id,
).model_dump(exclude_none=True))
async def _handle_tasks_list(a2a_service, params, request_id):
"""Handle tasks/list JSON-RPC method."""
context_id = params.get("contextId")
tasks = a2a_service.list_tasks(context_id=context_id)
task_dicts = [t.model_dump(exclude_none=True) if hasattr(t, "model_dump") else t for t in tasks]
return JSONResponse(content=JsonRpcResponse(
jsonrpc="2.0", result={"tasks": task_dicts}, id=request_id,
).model_dump(exclude_none=True))
async def _handle_tasks_cancel(a2a_service, params, request_id):
"""Handle tasks/cancel JSON-RPC method."""
task_id = params.get("taskId")
if not task_id:
return _create_error_response(-32602, "Invalid params",
"Missing required parameter: taskId", request_id)
success = a2a_service.cancel_task(task_id)
return JSONResponse(content=JsonRpcResponse(
jsonrpc="2.0", result={"canceled": success}, id=request_id,
).model_dump(exclude_none=True))
def _create_error_response(code, message, data=None, request_id=None):
"""Create a JSON-RPC error response."""
return JsonRpcResponse(
jsonrpc="2.0",
error=JsonRpcError(code=code, message=message, data=data).model_dump(exclude_none=True),
id=request_id,
)
'''
# =============================================================================
# PATCH 3: Update langflow/api/v1/__init__.py (add router import)
# =============================================================================
init_router_patch = '''
# Add this import line to langflow/api/v1/__init__.py:
from langflow.api.v1.a2a import router as a2a_router # <-- ADD THIS LINE
# And add "a2a_router" to the __all__ list
'''
# =============================================================================
# PATCH 4: Update langflow/api/router.py (register the router)
# =============================================================================
router_registration_patch = '''
# In langflow/api/router.py:
# 1. Add a2a_router to the imports:
from langflow.api.v1 import (
a2a_router, # <-- ADD THIS LINE
# ... existing imports ...
)
# 2. Register the router (add this line alongside the other router.include_router calls):
router_v1.include_router(a2a_router) # <-- ADD THIS LINE
'''
# =============================================================================
# INSTALLATION INSTRUCTIONS
# =============================================================================
instructions = '''
INSTALLATION INSTRUCTIONS
=========================
1. Locate your Langflow installation's source code:
- If installed via pip: find with `python -c "import langflow; print(langflow.__file__)"`
- If running from source: navigate to src/backend/base/langflow/
2. Create the A2A service directory and add service files:
mkdir -p langflow/services/a2a/
- __init__.py (copy content from __init__py variable above)
- service.py (copy content from service_py variable above)
- factory.py (copy content from factory_py variable above)
3. Create the A2A router file:
- Create langflow/api/v1/a2a.py (copy content from a2a_router_py variable above)
4. Edit langflow/services/schema.py:
- Find the ServiceType enum
- Add: A2A_SERVICE = "a2a_service"
5. Edit langflow/services/deps.py:
- Add import: from langflow.services.a2a.service import A2AService
- Add the get_a2a_service() function (see deps_patch above)
6. Edit langflow/api/v1/__init__.py:
- Add: from langflow.api.v1.a2a import router as a2a_router
- Add "a2a_router" to the __all__ list
7. Edit langflow/api/router.py:
- Add a2a_router to imports from langflow.api.v1
- Add: router_v1.include_router(a2a_router)
8. Install the A2A SDK:
pip install a2a-sdk
9. Restart Langflow
VERIFICATION
============
After restarting:
- The A2A Server Agent component should work without import errors
- Agent card discovery: GET /api/v1/a2a/{flow_id}/{component_id}/.well-known/agent.json
- JSON-RPC endpoint: POST /api/v1/a2a/{flow_id}/{component_id}/rpc
Supported JSON-RPC methods: message/send, message/stream, tasks/get, tasks/list, tasks/cancel
'''
if __name__ == "__main__":
print(instructions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment