Skip to content

Instantly share code, notes, and snippets.

@codefromthecrypt
Last active May 26, 2025 23:22
Show Gist options
  • Save codefromthecrypt/044f19f65eefeff800d38a97993fc10e to your computer and use it in GitHub Desktop.
Save codefromthecrypt/044f19f65eefeff800d38a97993fc10e to your computer and use it in GitHub Desktop.
run A2A's latest python SDK
# e.g. to run with ollama, do this:
# OPENAI_BASE_URL=http://localhost:11434/v1 OPENAI_API_KEY=unused CHAT_MODEL=qwen3:1.7b uv run a2a.py
#
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "a2a-sdk",
# "uvicorn",
# "openai",
# "httpx"
# ]
# ///
import asyncio
import os
from contextlib import asynccontextmanager
from typing import Any, Callable
from uuid import uuid4
import httpx
import uvicorn
from a2a.client import A2AClient
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
AgentCapabilities,
AgentCard,
Message,
MessageSendParams,
Role,
SendMessageRequest,
TextPart,
UnsupportedOperationError,
)
from a2a.utils import new_agent_text_message
from a2a.utils.errors import ServerError
from openai import AsyncOpenAI
from typing_extensions import AsyncIterator, override
# TODO: remove after https://github.com/google/a2a-python/issues/4
default_port = 10000
class ElasticsearchVersionAgent(AgentExecutor):
@staticmethod
def card() -> AgentCard:
return AgentCard(
name="ElasticsearchVersionAgent",
description="Answers questions about Elasticsearch versions.",
version="1.0.0",
url=f"http://localhost:{default_port}/",
defaultInputModes=["text"],
defaultOutputModes=["text"],
capabilities=AgentCapabilities(),
skills=[],
)
def __init__(self, model_name: str):
self.model_name = model_name
@override
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
query = context.get_user_input()
async with httpx.AsyncClient() as client:
response = await client.get("https://artifacts.elastic.co/releases/stack.json")
response.raise_for_status()
releases_json = response.text
system_prompt = f"""
You are an Elasticsearch version agent. Your sole purpose is to answer questions about Elasticsearch versions.
Rules:
- Only use knowledge in the version_list.
- The 'version' field is defined as a semantic version, so 8.18.1 means major=8, minor=18, patch=1.
- Exclude pre-release versions, which are those with "-" in their version string (e.g., "8.0.0-alpha1").
- When filtering on version number, choose the latest patch that matches, even if it was released before a lower one.
- Be concise in your response.
<version_list>
{releases_json}
</version_list>
/no_think
"""
client = AsyncOpenAI()
messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": query}]
chat_completion = await client.chat.completions.create(
model=self.model_name,
messages=messages,
temperature=0,
)
result = chat_completion.choices[0].message.content
event_queue.enqueue_event(new_agent_text_message(result))
@override
async def cancel(self, context: RequestContext, event_queue: EventQueue):
raise ServerError(error=UnsupportedOperationError())
async def main() -> None:
model_name = os.getenv("CHAT_MODEL", "gpt-4o-mini")
agent = ElasticsearchVersionAgent(model_name)
request_handler = DefaultRequestHandler(agent_executor=agent, task_store=InMemoryTaskStore())
app = A2AStarletteApplication(agent_card=agent.card(), http_handler=request_handler).build()
# For convenience, launch the server in the same process as the client.
async with serve_on_default_port(app):
# Run the A2A client to ask the agent a question
async with httpx.AsyncClient(timeout=30.0) as httpx_client:
client = await A2AClient.get_client_from_agent_card_url(httpx_client, f"http://localhost:{default_port}")
request = SendMessageRequest(
params=MessageSendParams(
message=Message(
role=Role.user,
parts=[TextPart(text="What is the latest version of Elasticsearch 8?")],
messageId=uuid4().hex,
)
)
)
response = await client.send_message(request)
print(response.root.result.parts[0].root.text)
@asynccontextmanager
async def serve_on_default_port(app: Callable[..., Any]) -> AsyncIterator[None]:
"""Registers the app and serves it on the default port."""
config = uvicorn.Config(app, host="127.0.0.1", port=default_port, log_level="error")
server = uvicorn.Server(config)
task = asyncio.create_task(server.serve())
while not (server and server.started):
await asyncio.sleep(0.01)
try:
yield
finally:
server.should_exit = True
await server.shutdown()
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment