Skip to content

Instantly share code, notes, and snippets.

@janakiramm
Created December 12, 2025 18:09
Show Gist options
  • Select an option

  • Save janakiramm/bdfc5166922ef3aa3e52c3e42f787e89 to your computer and use it in GitHub Desktop.

Select an option

Save janakiramm/bdfc5166922ef3aa3e52c3e42f787e89 to your computer and use it in GitHub Desktop.
import os
import asyncio
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
import warnings
warnings.filterwarnings("ignore")
import logging
logging.basicConfig(level=logging.ERROR)
def get_weather(city: str) -> dict:
print(f"--- Tool: get_weather called for city: {city} ---")
city_normalized = city.lower().replace(" ", "")
mock_weather_db = {
"newyork": {"status": "success", "report": "The weather in New York is sunny with a temperature of 25°C."},
"london": {"status": "success", "report": "It's cloudy in London with a temperature of 15°C."},
"tokyo": {"status": "success", "report": "Tokyo is experiencing light rain and a temperature of 18°C."},
}
if city_normalized in mock_weather_db:
return mock_weather_db[city_normalized]
else:
return {"status": "error", "error_message": f"Sorry, I don't have weather information for '{city}'."}
AGENT_MODEL = "gemini-2.5-flash"
weather_agent = Agent(
name="weather_agent_v1",
model=AGENT_MODEL,
description="Provides weather information for specific cities.",
instruction="You are a helpful weather assistant. "
"When the user asks for the weather in a specific city, "
"use the 'get_weather' tool to find the information. "
"If the tool returns an error, inform the user politely. "
"If the tool is successful, present the weather report clearly.",
tools=[get_weather],
)
session_service = InMemorySessionService()
APP_NAME = "weather_tutorial_app"
USER_ID = "user_1"
SESSION_ID = "session_001"
session = asyncio.run(session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID
))
print(f"Session created: App='{APP_NAME}', User='{USER_ID}', Session='{SESSION_ID}'")
runner = Runner(
agent=weather_agent,
app_name=APP_NAME,
session_service=session_service
)
print(f"Runner created for agent '{runner.agent.name}'.")
def call_agent(query: str, runner, user_id, session_id):
print(f"\n>>> User Query: {query}")
content = types.Content(role='user', parts=[types.Part(text=query)])
final_response_text = "Agent did not produce a final response."
for event in runner.run(user_id=user_id, session_id=session_id, new_message=content):
#print(event)
if event.is_final_response():
if event.content and event.content.parts:
final_response_text = event.content.parts[0].text
print(f"<<< Agent Response: {final_response_text}")
def run_conversation():
call_agent("What is the weather like in London?", runner, USER_ID, SESSION_ID)
call_agent("How about Paris?", runner, USER_ID, SESSION_ID)
call_agent("Tell me the weather in New York", runner, USER_ID, SESSION_ID)
if __name__ == "__main__":
try:
run_conversation()
except Exception as e:
print(f"An error occurred: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment