Skip to content

Instantly share code, notes, and snippets.

@ftnext
Last active February 14, 2026 13:44
Show Gist options
  • Select an option

  • Save ftnext/8803f16f0c29f0f2436f54d87dd295d9 to your computer and use it in GitHub Desktop.

Select an option

Save ftnext/8803f16f0c29f0f2436f54d87dd295d9 to your computer and use it in GitHub Desktop.
import logging
import sys
import time
import uuid
from pathlib import Path
from typing import Any, Optional
import uvicorn
from typing_extensions import override
from google.adk.cli.fast_api import get_fast_api_app
from google.adk.cli.service_registry import get_service_registry
from google.adk.sessions.base_session_service import (
BaseSessionService,
GetSessionConfig,
ListSessionsResponse,
)
from google.adk.sessions.session import Session
logger = logging.getLogger("google_adk." + __name__)
class NullSessionService(BaseSessionService):
@override
async def create_session(
self,
*,
app_name: str,
user_id: str,
state: Optional[dict[str, Any]] = None,
session_id: Optional[str] = None,
) -> Session:
logger.debug(
"create_session: app_name=%s user_id=%s session_id=%s",
app_name,
user_id,
session_id,
)
resolved_session_id = (
session_id.strip()
if session_id and session_id.strip()
else str(uuid.uuid4())
)
return Session(
id=resolved_session_id,
app_name=app_name,
user_id=user_id,
state=state or {},
events=[],
last_update_time=time.time(),
)
@override
async def get_session(
self,
*,
app_name: str,
user_id: str,
session_id: str,
config: Optional[GetSessionConfig] = None,
) -> Optional[Session]:
logger.debug(
"get_session: app_name=%s user_id=%s session_id=%s",
app_name,
user_id,
session_id,
)
del config
return Session(
id=session_id,
app_name=app_name,
user_id=user_id,
state={},
events=[],
last_update_time=time.time(),
)
@override
async def list_sessions(
self,
*,
app_name: str,
user_id: Optional[str] = None,
) -> ListSessionsResponse:
logger.debug("list_sessions: app_name=%s user_id=%s", app_name, user_id)
return ListSessionsResponse(sessions=[])
@override
async def delete_session(
self,
*,
app_name: str,
user_id: str,
session_id: str,
) -> None:
pass
def _null_session_factory(uri: str, **kwargs) -> NullSessionService:
del uri
del kwargs
return NullSessionService()
get_service_registry().register_session_service("null", _null_session_factory)
app = get_fast_api_app(
agents_dir=str(Path(__file__).parent),
session_service_uri="null://",
web=False,
a2a=True,
)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.addFilter(logging.Filter("google_adk"))
handler.setFormatter(
logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s:%(funcName)s:%(lineno)d - %(message)s"
)
)
root_logger.addHandler(handler)
uvicorn.run(app, host="0.0.0.0", port=8001)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment