Google ADK OpenInference 親スパン問題 - 修正案実装例
from opentelemetry import context as context_api
from opentelemetry .trace import get_current_span , SpanContext
class _RunnerRunAsync (_WithTracer ):
def __call__ (
self ,
wrapped : Callable [..., AsyncGenerator [Event , None ]],
instance : Runner ,
args : tuple [Any , ...],
kwargs : _RunnerRunAsyncKwargs ,
) -> Any :
generator = wrapped (* args , ** kwargs )
if context_api .get_value (_SUPPRESS_INSTRUMENTATION_KEY ):
return generator
tracer = self ._tracer
name = f"invocation [{ instance .app_name } ]"
attributes = dict (get_attributes_from_context ())
attributes [SpanAttributes .OPENINFERENCE_SPAN_KIND ] = OpenInferenceSpanKindValues .CHAIN .value
arguments = bind_args_kwargs (wrapped , * args , ** kwargs )
try :
attributes [SpanAttributes .INPUT_VALUE ] = json .dumps (
arguments ,
default = _default ,
ensure_ascii = False ,
)
attributes [SpanAttributes .INPUT_MIME_TYPE ] = OpenInferenceMimeTypeValues .JSON .value
except Exception :
logger .exception (f"Failed to get attribute: { SpanAttributes .INPUT_VALUE } ." )
if (user_id := kwargs .get ("user_id" )) is not None :
attributes [SpanAttributes .USER_ID ] = user_id
if (session_id := kwargs .get ("session_id" )) is not None :
attributes [SpanAttributes .SESSION_ID ] = session_id
class _AsyncGenerator (wrapt .ObjectProxy ): # type: ignore[misc]
__wrapped__ : AsyncGenerator [Event , None ]
async def __aiter__ (self ) -> Any :
with ExitStack () as stack :
# === 修正ポイント: 親スパンのコンテキストを保持 ===
# 現在のコンテキストを取得(親スパンの情報を含む)
current_context = context_api .get_current ()
parent_span = get_current_span ()
# 親スパンが有効な場合、そのコンテキストを使用
span_context = None
if parent_span and parent_span .get_span_context ().is_valid :
logger .debug (
f"Creating invocation span as child of: "
f"{ parent_span .get_span_context ().trace_id } "
)
# 明示的に現在のコンテキストを渡す
span = stack .enter_context (
tracer .start_as_current_span (
name = name ,
attributes = attributes ,
context = current_context , # 親コンテキストを明示的に渡す
)
)
else :
logger .debug (f"Creating invocation span as root span" )
span = stack .enter_context (
tracer .start_as_current_span (
name = name ,
attributes = attributes ,
)
)
# === 修正ポイント終了 ===
if user_id is not None :
stack .enter_context (using_user (user_id ))
if session_id is not None :
stack .enter_context (using_session (session_id ))
async for event in self .__wrapped__ :
if event .is_final_response ():
try :
span .set_attribute (
SpanAttributes .OUTPUT_VALUE ,
event .model_dump_json (exclude_none = True ),
)
span .set_attribute (
SpanAttributes .OUTPUT_MIME_TYPE ,
OpenInferenceMimeTypeValues .JSON .value ,
)
except Exception :
logger .exception (
f"Failed to get attribute: { SpanAttributes .OUTPUT_VALUE } ."
)
yield event
span .set_status (StatusCode .OK )
return _AsyncGenerator (generator )
修正案2: start_spanを使用した明示的な親子関係
from opentelemetry import trace as trace_api
from opentelemetry .trace import Link
class _RunnerRunAsync (_WithTracer ):
def __call__ (
self ,
wrapped : Callable [..., AsyncGenerator [Event , None ]],
instance : Runner ,
args : tuple [Any , ...],
kwargs : _RunnerRunAsyncKwargs ,
) -> Any :
generator = wrapped (* args , ** kwargs )
if context_api .get_value (_SUPPRESS_INSTRUMENTATION_KEY ):
return generator
tracer = self ._tracer
name = f"invocation [{ instance .app_name } ]"
attributes = dict (get_attributes_from_context ())
attributes [SpanAttributes .OPENINFERENCE_SPAN_KIND ] = OpenInferenceSpanKindValues .CHAIN .value
# ... (input attributes設定は同じ)
class _AsyncGenerator (wrapt .ObjectProxy ):
__wrapped__ : AsyncGenerator [Event , None ]
async def __aiter__ (self ) -> Any :
with ExitStack () as stack :
# === 修正ポイント: start_spanを使用 ===
parent_span = get_current_span ()
# 親スパンのコンテキストを取得
if parent_span and parent_span .get_span_context ().is_valid :
# 親スパンが存在する場合、明示的に親子関係を設定
span = tracer .start_span (
name = name ,
context = trace_api .set_span_in_context (parent_span ),
attributes = attributes ,
)
else :
# 親スパンが存在しない場合
span = tracer .start_span (
name = name ,
attributes = attributes ,
)
# スパンをコンテキストにアタッチ
token = context_api .attach (trace_api .set_span_in_context (span ))
stack .callback (context_api .detach , token )
stack .callback (span .end )
# === 修正ポイント終了 ===
if user_id is not None :
stack .enter_context (using_user (user_id ))
if session_id is not None :
stack .enter_context (using_session (session_id ))
async for event in self .__wrapped__ :
if event .is_final_response ():
try :
span .set_attribute (
SpanAttributes .OUTPUT_VALUE ,
event .model_dump_json (exclude_none = True ),
)
span .set_attribute (
SpanAttributes .OUTPUT_MIME_TYPE ,
OpenInferenceMimeTypeValues .JSON .value ,
)
except Exception :
logger .exception (
f"Failed to get attribute: { SpanAttributes .OUTPUT_VALUE } ."
)
yield event
span .set_status (StatusCode .OK )
return _AsyncGenerator (generator )
修正案3: _PassthroughTracerの改善
class _PassthroughTracer (wrapt .ObjectProxy ): # type: ignore[misc]
"""A tracer proxy that passes through span operations without creating new spans.
This is used to disable existing tracers during instrumentation to prevent
double-instrumentation of the same operations.
"""
@_agnosticcontextmanager
def start_as_current_span (self , * args : Any , ** kwargs : Any ) -> Iterator [Span ]:
"""Return the current span without creating a new one.
This method preserves the context propagation by properly managing
the OpenTelemetry context stack.
"""
# === 修正ポイント: コンテキストの伝播を保持 ===
current_span = get_current_span ()
# 現在のコンテキストをトークンとして保存
token = context_api .attach (
trace_api .set_span_in_context (current_span )
)
try :
# ログを出力(デバッグ用)
if current_span .is_recording ():
logger .debug (
f"_PassthroughTracer: Passing through span "
f"{ current_span .get_span_context ().span_id } "
)
yield current_span
finally :
# コンテキストを元に戻す
context_api .detach (token )
# === 修正ポイント終了 ===
from contextlib import asynccontextmanager
from typing import Optional
class _RunnerRunAsync (_WithTracer ):
def __call__ (
self ,
wrapped : Callable [..., AsyncGenerator [Event , None ]],
instance : Runner ,
args : tuple [Any , ...],
kwargs : _RunnerRunAsyncKwargs ,
) -> Any :
generator = wrapped (* args , ** kwargs )
if context_api .get_value (_SUPPRESS_INSTRUMENTATION_KEY ):
return generator
tracer = self ._tracer
name = f"invocation [{ instance .app_name } ]"
attributes = dict (get_attributes_from_context ())
attributes [SpanAttributes .OPENINFERENCE_SPAN_KIND ] = OpenInferenceSpanKindValues .CHAIN .value
# ... (input attributes設定は同じ)
# === 修正ポイント: 専用のコンテキストマネージャーを作成 ===
@asynccontextmanager
async def managed_invocation_span ():
"""Invocationスパンを適切なコンテキスト管理で作成"""
# 現在のコンテキストとスパンを取得
current_context = context_api .get_current ()
parent_span = get_current_span ()
# コンテキストの有効性を確認
parent_is_valid = (
parent_span
and parent_span .get_span_context ().is_valid
and parent_span .is_recording ()
)
if parent_is_valid :
logger .info (
f"Creating invocation [{ instance .app_name } ] as child of "
f"trace_id={ parent_span .get_span_context ().trace_id } "
)
else :
logger .info (
f"Creating invocation [{ instance .app_name } ] as root span"
)
# スパンを作成
with tracer .start_as_current_span (
name = name ,
attributes = attributes ,
context = current_context ,
) as span :
try :
yield span
except Exception as e :
span .set_status (StatusCode .ERROR , str (e ))
span .record_exception (e )
raise
else :
span .set_status (StatusCode .OK )
# === 修正ポイント終了 ===
class _AsyncGenerator (wrapt .ObjectProxy ):
__wrapped__ : AsyncGenerator [Event , None ]
async def __aiter__ (self ) -> Any :
async with managed_invocation_span () as span :
with ExitStack () as stack :
if user_id is not None :
stack .enter_context (using_user (user_id ))
if session_id is not None :
stack .enter_context (using_session (session_id ))
async for event in self .__wrapped__ :
if event .is_final_response ():
try :
span .set_attribute (
SpanAttributes .OUTPUT_VALUE ,
event .model_dump_json (exclude_none = True ),
)
span .set_attribute (
SpanAttributes .OUTPUT_MIME_TYPE ,
OpenInferenceMimeTypeValues .JSON .value ,
)
except Exception :
logger .exception (
f"Failed to get attribute: { SpanAttributes .OUTPUT_VALUE } ."
)
yield event
return _AsyncGenerator (generator )
修正後の動作を確認するためのテストコード:
import asyncio
from opentelemetry import trace as trace_api
from opentelemetry .sdk .trace import TracerProvider
from opentelemetry .sdk .trace .export import ConsoleSpanExporter , SimpleSpanProcessor
from openinference .instrumentation .google_adk import GoogleADKInstrumentor
from google .adk .agents import Agent
from google .adk .runners import InMemoryRunner
from google .genai import types
async def test_parent_span_propagation ():
"""親スパンの伝播をテスト"""
# トレーサーの設定
tracer_provider = TracerProvider ()
tracer_provider .add_span_processor (
SimpleSpanProcessor (ConsoleSpanExporter ())
)
# インスツルメンテーションの設定
GoogleADKInstrumentor ().instrument (tracer_provider = tracer_provider )
# テスト用エージェントの作成
agent = Agent (
name = "test_agent" ,
model = "gemini-2.0-flash-exp" ,
description = "Test agent" ,
instruction = "Answer questions." ,
)
# Runnerの作成
app_name = "test_app"
runner = InMemoryRunner (agent = agent , app_name = app_name )
session_service = runner .session_service
user_id = "test_user"
session_id = "test_session"
await session_service .create_session (
app_name = app_name ,
user_id = user_id ,
session_id = session_id
)
# === テスト: 親スパンを作成してからADKを実行 ===
tracer = trace_api .get_tracer (__name__ )
with tracer .start_as_current_span ("custom_session_span" ) as session_span :
print (f"Session span trace_id: { session_span .get_span_context ().trace_id } " )
print (f"Session span span_id: { session_span .get_span_context ().span_id } " )
# ADKのrunを実行
async for event in runner .run_async (
user_id = user_id ,
session_id = session_id ,
new_message = types .Content (
role = "user" ,
parts = [types .Part (text = "Hello" )]
)
):
if event .is_final_response ():
print (f"Response: { event .content .parts [0 ].text } " )
print ("\n === 期待される結果 ===" )
print ("invocationスパンがcustom_session_spanの子として表示されるべき" )
print ("trace_idが同じであることを確認" )
if __name__ == "__main__" :
asyncio .run (test_parent_span_propagation ())
修正の適用
# パッケージをソースから編集可能モードでインストール
pip install -e .
テストの実行
python test_parent_span.py
期待される出力
Session span trace_id: 12345678901234567890123456789012
Session span span_id: 1234567890123456
Creating invocation [test_app] as child of trace_id=12345678901234567890123456789012
...
トレースの可視化
後方互換性
既存のコードが親スパンを提供しない場合でも正常に動作する必要がある
修正案1と4は後方互換性を保持
パフォーマンス
コンテキストの取得と検証によるオーバーヘッドは最小限
ログ出力はデバッグレベルに設定
エラーハンドリング
親スパンの検証時のエラーを適切に処理
フォールバック動作を実装
修正案4 (より堅牢なコンテキスト管理) を推奨します。理由:
✅ 明示的なコンテキスト管理
✅ 後方互換性の保持
✅ エラーハンドリングの強化
✅ ログによるデバッグのサポート
✅ 非同期処理に最適化
加えて、修正案3 (_PassthroughTracerの改善) も併せて適用することで、
より堅牢なトレーシング実装を実現できます。