Last active
May 26, 2025 23:22
-
-
Save codefromthecrypt/044f19f65eefeff800d38a97993fc10e to your computer and use it in GitHub Desktop.
run A2A's latest python SDK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# e.g. to run with ollama, do this: | |
# OPENAI_BASE_URL=http://localhost:11434/v1 OPENAI_API_KEY=unused CHAT_MODEL=qwen3:1.7b uv run a2a.py | |
# | |
# /// script | |
# requires-python = ">=3.13" | |
# dependencies = [ | |
# "a2a-sdk", | |
# "uvicorn", | |
# "openai", | |
# "httpx" | |
# ] | |
# /// | |
import asyncio | |
import os | |
from contextlib import asynccontextmanager | |
from typing import Any, Callable | |
from uuid import uuid4 | |
import httpx | |
import uvicorn | |
from a2a.client import A2AClient | |
from a2a.server.agent_execution import AgentExecutor, RequestContext | |
from a2a.server.apps import A2AStarletteApplication | |
from a2a.server.events import EventQueue | |
from a2a.server.request_handlers import DefaultRequestHandler | |
from a2a.server.tasks import InMemoryTaskStore | |
from a2a.types import ( | |
AgentCapabilities, | |
AgentCard, | |
Message, | |
MessageSendParams, | |
Role, | |
SendMessageRequest, | |
TextPart, | |
UnsupportedOperationError, | |
) | |
from a2a.utils import new_agent_text_message | |
from a2a.utils.errors import ServerError | |
from openai import AsyncOpenAI | |
from typing_extensions import AsyncIterator, override | |
# TODO: remove after https://github.com/google/a2a-python/issues/4 | |
default_port = 10000 | |
class ElasticsearchVersionAgent(AgentExecutor): | |
@staticmethod | |
def card() -> AgentCard: | |
return AgentCard( | |
name="ElasticsearchVersionAgent", | |
description="Answers questions about Elasticsearch versions.", | |
version="1.0.0", | |
url=f"http://localhost:{default_port}/", | |
defaultInputModes=["text"], | |
defaultOutputModes=["text"], | |
capabilities=AgentCapabilities(), | |
skills=[], | |
) | |
def __init__(self, model_name: str): | |
self.model_name = model_name | |
@override | |
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: | |
query = context.get_user_input() | |
async with httpx.AsyncClient() as client: | |
response = await client.get("https://artifacts.elastic.co/releases/stack.json") | |
response.raise_for_status() | |
releases_json = response.text | |
system_prompt = f""" | |
You are an Elasticsearch version agent. Your sole purpose is to answer questions about Elasticsearch versions. | |
Rules: | |
- Only use knowledge in the version_list. | |
- The 'version' field is defined as a semantic version, so 8.18.1 means major=8, minor=18, patch=1. | |
- Exclude pre-release versions, which are those with "-" in their version string (e.g., "8.0.0-alpha1"). | |
- When filtering on version number, choose the latest patch that matches, even if it was released before a lower one. | |
- Be concise in your response. | |
<version_list> | |
{releases_json} | |
</version_list> | |
/no_think | |
""" | |
client = AsyncOpenAI() | |
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}] | |
chat_completion = await client.chat.completions.create( | |
model=self.model_name, | |
messages=messages, | |
temperature=0, | |
) | |
result = chat_completion.choices[0].message.content | |
event_queue.enqueue_event(new_agent_text_message(result)) | |
@override | |
async def cancel(self, context: RequestContext, event_queue: EventQueue): | |
raise ServerError(error=UnsupportedOperationError()) | |
async def main() -> None: | |
model_name = os.getenv("CHAT_MODEL", "gpt-4o-mini") | |
agent = ElasticsearchVersionAgent(model_name) | |
request_handler = DefaultRequestHandler(agent_executor=agent, task_store=InMemoryTaskStore()) | |
app = A2AStarletteApplication(agent_card=agent.card(), http_handler=request_handler).build() | |
# For convenience, launch the server in the same process as the client. | |
async with serve_on_default_port(app): | |
# Run the A2A client to ask the agent a question | |
async with httpx.AsyncClient(timeout=30.0) as httpx_client: | |
client = await A2AClient.get_client_from_agent_card_url(httpx_client, f"http://localhost:{default_port}") | |
request = SendMessageRequest( | |
params=MessageSendParams( | |
message=Message( | |
role=Role.user, | |
parts=[TextPart(text="What is the latest version of Elasticsearch 8?")], | |
messageId=uuid4().hex, | |
) | |
) | |
) | |
response = await client.send_message(request) | |
print(response.root.result.parts[0].root.text) | |
@asynccontextmanager | |
async def serve_on_default_port(app: Callable[..., Any]) -> AsyncIterator[None]: | |
"""Registers the app and serves it on the default port.""" | |
config = uvicorn.Config(app, host="127.0.0.1", port=default_port, log_level="error") | |
server = uvicorn.Server(config) | |
task = asyncio.create_task(server.serve()) | |
while not (server and server.started): | |
await asyncio.sleep(0.01) | |
try: | |
yield | |
finally: | |
server.should_exit = True | |
await server.shutdown() | |
task.cancel() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment