Skip to content

Instantly share code, notes, and snippets.

@myui
Created November 17, 2025 02:08
Show Gist options
  • Select an option

  • Save myui/775ec372f614a79c350f48a83c730666 to your computer and use it in GitHub Desktop.

Select an option

Save myui/775ec372f614a79c350f48a83c730666 to your computer and use it in GitHub Desktop.

Google ADK OpenInference 親スパン問題 - 調査結果まとめ

問題の概要

openinference-instrumentation-google-adk (v0.1.6) において、invocationスパンが常にトレースのルートになり、セッションやその他の親スパンの子として正しく配置されない問題が確認されました。

主な原因

1. _RunnerRunAsyncでのコンテキスト伝播の欠如

問題箇所: _wrappers.py 115-119行目

span = stack.enter_context(
    tracer.start_as_current_span(
        name=name,
        attributes=attributes,
    )
)

tracer.start_as_current_span()を呼び出す際、現在のOpenTelemetryコンテキストが暗黙的に使用されますが、非同期ジェネレーター内での実行により、親スパンのコンテキストが適切に伝播されない場合があります。

2. _PassthroughTracerのコンテキスト管理

問題箇所: __init__.py 184-187行目

@_agnosticcontextmanager
def start_as_current_span(self, *args: Any, **kwargs: Any) -> Iterator[Span]:
    """Return the current span without creating a new one."""
    yield get_current_span()

既存のADKトレーサーを無効化するための_PassthroughTracerが、コンテキストスタックを適切に管理していないため、スパンの階層構造が崩れる可能性があります。

期待される動作 vs 現在の動作

期待される階層

CustomSessionSpan (user-created)
└── invocation [app_name]
    └── agent_run [agent_name]
        ├── LLM call
        └── Tool call

現在の階層 (問題)

invocation [app_name] (ルートスパンとして独立)
└── agent_run [agent_name]
    ├── LLM call
    └── Tool call

CustomSessionSpan (別のトレースとして存在)

解決策

提案1: コンテキスト明示的伝播 (推奨)

_RunnerRunAsyncで親スパンのコンテキストを明示的に取得し、それを使用してスパンを作成します。

変更内容:

  • 現在のコンテキストをcontext_api.get_current()で取得
  • 親スパンの有効性を確認
  • start_as_current_span()contextパラメータを明示的に渡す

提案2: _PassthroughTracerの改善

コンテキストの適切な管理により、スパン階層の破壊を防ぎます。

変更内容:

  • コンテキストトークンを使用した適切な管理
  • context_api.attach()detach()の使用

ファイル一覧

本調査では以下のファイルを作成しました:

1. google_adk_instrumentation_analysis.md

  • 内容: 詳細な問題分析と原因の特定
  • 対象読者: 開発者、メンテナー
  • 含まれる情報:
    • ソースコードの詳細分析
    • 問題の根本原因
    • スパン階層の期待と現実
    • 4つの解決策の提案
    • 検証方法

2. fix_proposals.md

  • 内容: 具体的な修正案の実装例
  • 対象読者: 実装者
  • 含まれる情報:
    • 4つの修正案の完全なコード例
    • テストコード
    • 検証手順
    • 推奨される修正案の選択理由

3. parent_span_fix.patch

  • 内容: 適用可能なパッチファイル
  • 対象読者: すぐに修正を試したい開発者
  • 使用方法:
    cd /path/to/openinference-instrumentation-google-adk
    patch -p1 < parent_span_fix.patch

4. README.md (このファイル)

  • 内容: 調査結果の概要とガイド
  • 対象読者: すべてのユーザー

クイックスタート

問題を確認したい場合

  1. 以下のコードで問題を再現:
from opentelemetry import trace
from openinference.instrumentation.google_adk import GoogleADKInstrumentor
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner

# インスツルメンテーション
GoogleADKInstrumentor().instrument()

# カスタム親スパンを作成
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("my_session"):
    # ADKを実行
    # invocationがmy_sessionの子になるか確認
    ...
  1. トレースビューアで確認
    • invocationが独立したルートスパンになっている (問題)
    • invocationがmy_sessionの子スパンになっている (期待)

修正を適用したい場合

Option A: パッチファイルを使用

# パッケージのソースディレクトリに移動
cd $(python -c "import openinference.instrumentation.google_adk; import os; print(os.path.dirname(openinference.instrumentation.google_adk.__file__))")
cd ../../..

# パッチを適用
patch -p1 < /path/to/parent_span_fix.patch

# 再インストール (editable mode)
pip install -e .

Option B: 手動で修正

fix_proposals.mdの「修正案4」のコードを参照し、該当ファイルを手動で修正してください。

修正を検証したい場合

# fix_proposals.mdのテストコードを実行
python test_parent_span.py

# 期待される出力:
# - "Creating invocation [test_app] as child of trace_id=..."
# - トレースビューアで正しい階層が表示される

技術的詳細

OpenTelemetryコンテキスト伝播の仕組み

OpenTelemetryでは、スパンの親子関係はコンテキスト伝播メカニズムによって管理されます:

  1. Context: 現在のスパン情報を含むコンテキストオブジェクト
  2. Token: コンテキストをスタックに追加/削除する際のハンドル
  3. Propagation: スレッド/タスク間でのコンテキスト伝播

非同期処理での注意点

非同期ジェネレーター内でスパンを作成する場合:

async def generator():
    # コンテキストは自動的に伝播されない場合がある
    with tracer.start_as_current_span("span"):
        yield item

解決策:

async def generator():
    # コンテキストを明示的に保持
    current_context = context_api.get_current()
    with tracer.start_as_current_span("span", context=current_context):
        yield item

関連リソース

OpenInference公式ドキュメント

OpenTelemetry Python

Google ADK

貢献

この問題の修正に貢献したい場合:

  1. OpenInferenceリポジトリにIssueを作成

    • タイトル: "Google ADK: Invocation spans not respecting parent context"
    • 本調査結果を添付
  2. Pull Requestを提出

    • fix_proposals.mdの「修正案4」を実装
    • テストケースを追加
    • ドキュメントを更新

ライセンス

本調査資料は情報提供のみを目的としています。実装の修正を行う際は、OpenInferenceプロジェクトのライセンス(Apache 2.0)に従ってください。

サポート

質問や追加の調査が必要な場合:

  1. OpenInferenceのGitHub Discussions
  2. Google ADKのGitHub Issues
  3. OpenTelemetryのSlackコミュニティ

調査実施日: 2025年11月17日
調査対象バージョン: openinference-instrumentation-google-adk 0.1.6
調査者: Claude (Anthropic)

Google ADK OpenInference 親スパン問題 - 修正案実装例

修正案1: コンテキスト伝播の明示的な実装

修正ファイル: _wrappers.py

from opentelemetry import context as context_api
from opentelemetry.trace import get_current_span, SpanContext

class _RunnerRunAsync(_WithTracer):
    def __call__(
        self,
        wrapped: Callable[..., AsyncGenerator[Event, None]],
        instance: Runner,
        args: tuple[Any, ...],
        kwargs: _RunnerRunAsyncKwargs,
    ) -> Any:
        generator = wrapped(*args, **kwargs)
        if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
            return generator

        tracer = self._tracer
        name = f"invocation [{instance.app_name}]"
        attributes = dict(get_attributes_from_context())
        attributes[SpanAttributes.OPENINFERENCE_SPAN_KIND] = OpenInferenceSpanKindValues.CHAIN.value

        arguments = bind_args_kwargs(wrapped, *args, **kwargs)
        try:
            attributes[SpanAttributes.INPUT_VALUE] = json.dumps(
                arguments,
                default=_default,
                ensure_ascii=False,
            )
            attributes[SpanAttributes.INPUT_MIME_TYPE] = OpenInferenceMimeTypeValues.JSON.value
        except Exception:
            logger.exception(f"Failed to get attribute: {SpanAttributes.INPUT_VALUE}.")

        if (user_id := kwargs.get("user_id")) is not None:
            attributes[SpanAttributes.USER_ID] = user_id
        if (session_id := kwargs.get("session_id")) is not None:
            attributes[SpanAttributes.SESSION_ID] = session_id

        class _AsyncGenerator(wrapt.ObjectProxy):  # type: ignore[misc]
            __wrapped__: AsyncGenerator[Event, None]

            async def __aiter__(self) -> Any:
                with ExitStack() as stack:
                    # === 修正ポイント: 親スパンのコンテキストを保持 ===
                    # 現在のコンテキストを取得(親スパンの情報を含む)
                    current_context = context_api.get_current()
                    parent_span = get_current_span()
                    
                    # 親スパンが有効な場合、そのコンテキストを使用
                    span_context = None
                    if parent_span and parent_span.get_span_context().is_valid:
                        logger.debug(
                            f"Creating invocation span as child of: "
                            f"{parent_span.get_span_context().trace_id}"
                        )
                        # 明示的に現在のコンテキストを渡す
                        span = stack.enter_context(
                            tracer.start_as_current_span(
                                name=name,
                                attributes=attributes,
                                context=current_context,  # 親コンテキストを明示的に渡す
                            )
                        )
                    else:
                        logger.debug(f"Creating invocation span as root span")
                        span = stack.enter_context(
                            tracer.start_as_current_span(
                                name=name,
                                attributes=attributes,
                            )
                        )
                    # === 修正ポイント終了 ===
                    
                    if user_id is not None:
                        stack.enter_context(using_user(user_id))
                    if session_id is not None:
                        stack.enter_context(using_session(session_id))
                    async for event in self.__wrapped__:
                        if event.is_final_response():
                            try:
                                span.set_attribute(
                                    SpanAttributes.OUTPUT_VALUE,
                                    event.model_dump_json(exclude_none=True),
                                )
                                span.set_attribute(
                                    SpanAttributes.OUTPUT_MIME_TYPE,
                                    OpenInferenceMimeTypeValues.JSON.value,
                                )
                            except Exception:
                                logger.exception(
                                    f"Failed to get attribute: {SpanAttributes.OUTPUT_VALUE}."
                                )
                        yield event
                    span.set_status(StatusCode.OK)

        return _AsyncGenerator(generator)

修正案2: start_spanを使用した明示的な親子関係

from opentelemetry import trace as trace_api
from opentelemetry.trace import Link

class _RunnerRunAsync(_WithTracer):
    def __call__(
        self,
        wrapped: Callable[..., AsyncGenerator[Event, None]],
        instance: Runner,
        args: tuple[Any, ...],
        kwargs: _RunnerRunAsyncKwargs,
    ) -> Any:
        generator = wrapped(*args, **kwargs)
        if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
            return generator

        tracer = self._tracer
        name = f"invocation [{instance.app_name}]"
        attributes = dict(get_attributes_from_context())
        attributes[SpanAttributes.OPENINFERENCE_SPAN_KIND] = OpenInferenceSpanKindValues.CHAIN.value

        # ... (input attributes設定は同じ)

        class _AsyncGenerator(wrapt.ObjectProxy):
            __wrapped__: AsyncGenerator[Event, None]

            async def __aiter__(self) -> Any:
                with ExitStack() as stack:
                    # === 修正ポイント: start_spanを使用 ===
                    parent_span = get_current_span()
                    
                    # 親スパンのコンテキストを取得
                    if parent_span and parent_span.get_span_context().is_valid:
                        # 親スパンが存在する場合、明示的に親子関係を設定
                        span = tracer.start_span(
                            name=name,
                            context=trace_api.set_span_in_context(parent_span),
                            attributes=attributes,
                        )
                    else:
                        # 親スパンが存在しない場合
                        span = tracer.start_span(
                            name=name,
                            attributes=attributes,
                        )
                    
                    # スパンをコンテキストにアタッチ
                    token = context_api.attach(trace_api.set_span_in_context(span))
                    stack.callback(context_api.detach, token)
                    stack.callback(span.end)
                    # === 修正ポイント終了 ===

                    if user_id is not None:
                        stack.enter_context(using_user(user_id))
                    if session_id is not None:
                        stack.enter_context(using_session(session_id))
                        
                    async for event in self.__wrapped__:
                        if event.is_final_response():
                            try:
                                span.set_attribute(
                                    SpanAttributes.OUTPUT_VALUE,
                                    event.model_dump_json(exclude_none=True),
                                )
                                span.set_attribute(
                                    SpanAttributes.OUTPUT_MIME_TYPE,
                                    OpenInferenceMimeTypeValues.JSON.value,
                                )
                            except Exception:
                                logger.exception(
                                    f"Failed to get attribute: {SpanAttributes.OUTPUT_VALUE}."
                                )
                        yield event
                    span.set_status(StatusCode.OK)

        return _AsyncGenerator(generator)

修正案3: _PassthroughTracerの改善

修正ファイル: __init__.py

class _PassthroughTracer(wrapt.ObjectProxy):  # type: ignore[misc]
    """A tracer proxy that passes through span operations without creating new spans.
    
    This is used to disable existing tracers during instrumentation to prevent
    double-instrumentation of the same operations.
    """

    @_agnosticcontextmanager
    def start_as_current_span(self, *args: Any, **kwargs: Any) -> Iterator[Span]:
        """Return the current span without creating a new one.
        
        This method preserves the context propagation by properly managing
        the OpenTelemetry context stack.
        """
        # === 修正ポイント: コンテキストの伝播を保持 ===
        current_span = get_current_span()
        
        # 現在のコンテキストをトークンとして保存
        token = context_api.attach(
            trace_api.set_span_in_context(current_span)
        )
        
        try:
            # ログを出力(デバッグ用)
            if current_span.is_recording():
                logger.debug(
                    f"_PassthroughTracer: Passing through span "
                    f"{current_span.get_span_context().span_id}"
                )
            yield current_span
        finally:
            # コンテキストを元に戻す
            context_api.detach(token)
        # === 修正ポイント終了 ===

修正案4: より堅牢なコンテキスト管理

from contextlib import asynccontextmanager
from typing import Optional

class _RunnerRunAsync(_WithTracer):
    def __call__(
        self,
        wrapped: Callable[..., AsyncGenerator[Event, None]],
        instance: Runner,
        args: tuple[Any, ...],
        kwargs: _RunnerRunAsyncKwargs,
    ) -> Any:
        generator = wrapped(*args, **kwargs)
        if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
            return generator

        tracer = self._tracer
        name = f"invocation [{instance.app_name}]"
        attributes = dict(get_attributes_from_context())
        attributes[SpanAttributes.OPENINFERENCE_SPAN_KIND] = OpenInferenceSpanKindValues.CHAIN.value

        # ... (input attributes設定は同じ)

        # === 修正ポイント: 専用のコンテキストマネージャーを作成 ===
        @asynccontextmanager
        async def managed_invocation_span():
            """Invocationスパンを適切なコンテキスト管理で作成"""
            # 現在のコンテキストとスパンを取得
            current_context = context_api.get_current()
            parent_span = get_current_span()
            
            # コンテキストの有効性を確認
            parent_is_valid = (
                parent_span 
                and parent_span.get_span_context().is_valid
                and parent_span.is_recording()
            )
            
            if parent_is_valid:
                logger.info(
                    f"Creating invocation [{instance.app_name}] as child of "
                    f"trace_id={parent_span.get_span_context().trace_id}"
                )
            else:
                logger.info(
                    f"Creating invocation [{instance.app_name}] as root span"
                )
            
            # スパンを作成
            with tracer.start_as_current_span(
                name=name,
                attributes=attributes,
                context=current_context,
            ) as span:
                try:
                    yield span
                except Exception as e:
                    span.set_status(StatusCode.ERROR, str(e))
                    span.record_exception(e)
                    raise
                else:
                    span.set_status(StatusCode.OK)
        # === 修正ポイント終了 ===

        class _AsyncGenerator(wrapt.ObjectProxy):
            __wrapped__: AsyncGenerator[Event, None]

            async def __aiter__(self) -> Any:
                async with managed_invocation_span() as span:
                    with ExitStack() as stack:
                        if user_id is not None:
                            stack.enter_context(using_user(user_id))
                        if session_id is not None:
                            stack.enter_context(using_session(session_id))
                            
                        async for event in self.__wrapped__:
                            if event.is_final_response():
                                try:
                                    span.set_attribute(
                                        SpanAttributes.OUTPUT_VALUE,
                                        event.model_dump_json(exclude_none=True),
                                    )
                                    span.set_attribute(
                                        SpanAttributes.OUTPUT_MIME_TYPE,
                                        OpenInferenceMimeTypeValues.JSON.value,
                                    )
                                except Exception:
                                    logger.exception(
                                        f"Failed to get attribute: {SpanAttributes.OUTPUT_VALUE}."
                                    )
                            yield event

        return _AsyncGenerator(generator)

テストコード

修正後の動作を確認するためのテストコード:

import asyncio
from opentelemetry import trace as trace_api
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from openinference.instrumentation.google_adk import GoogleADKInstrumentor
from google.adk.agents import Agent
from google.adk.runners import InMemoryRunner
from google.genai import types

async def test_parent_span_propagation():
    """親スパンの伝播をテスト"""
    
    # トレーサーの設定
    tracer_provider = TracerProvider()
    tracer_provider.add_span_processor(
        SimpleSpanProcessor(ConsoleSpanExporter())
    )
    
    # インスツルメンテーションの設定
    GoogleADKInstrumentor().instrument(tracer_provider=tracer_provider)
    
    # テスト用エージェントの作成
    agent = Agent(
        name="test_agent",
        model="gemini-2.0-flash-exp",
        description="Test agent",
        instruction="Answer questions.",
    )
    
    # Runnerの作成
    app_name = "test_app"
    runner = InMemoryRunner(agent=agent, app_name=app_name)
    session_service = runner.session_service
    
    user_id = "test_user"
    session_id = "test_session"
    
    await session_service.create_session(
        app_name=app_name,
        user_id=user_id,
        session_id=session_id
    )
    
    # === テスト: 親スパンを作成してからADKを実行 ===
    tracer = trace_api.get_tracer(__name__)
    
    with tracer.start_as_current_span("custom_session_span") as session_span:
        print(f"Session span trace_id: {session_span.get_span_context().trace_id}")
        print(f"Session span span_id: {session_span.get_span_context().span_id}")
        
        # ADKのrunを実行
        async for event in runner.run_async(
            user_id=user_id,
            session_id=session_id,
            new_message=types.Content(
                role="user",
                parts=[types.Part(text="Hello")]
            )
        ):
            if event.is_final_response():
                print(f"Response: {event.content.parts[0].text}")
    
    print("\n=== 期待される結果 ===")
    print("invocationスパンがcustom_session_spanの子として表示されるべき")
    print("trace_idが同じであることを確認")

if __name__ == "__main__":
    asyncio.run(test_parent_span_propagation())

検証手順

  1. 修正の適用

    # パッケージをソースから編集可能モードでインストール
    pip install -e .
  2. テストの実行

    python test_parent_span.py
  3. 期待される出力

    Session span trace_id: 12345678901234567890123456789012
    Session span span_id: 1234567890123456
    Creating invocation [test_app] as child of trace_id=12345678901234567890123456789012
    ...
    
  4. トレースの可視化

    • Phoenix/Jaeger/Zipkinでトレースを確認
    • 階層構造が以下のようになっていることを確認:
      custom_session_span
      └── invocation [test_app]
          └── agent_run [test_agent]
              └── LLM call
      

注意事項

  1. 後方互換性

    • 既存のコードが親スパンを提供しない場合でも正常に動作する必要がある
    • 修正案1と4は後方互換性を保持
  2. パフォーマンス

    • コンテキストの取得と検証によるオーバーヘッドは最小限
    • ログ出力はデバッグレベルに設定
  3. エラーハンドリング

    • 親スパンの検証時のエラーを適切に処理
    • フォールバック動作を実装

推奨される修正案

修正案4 (より堅牢なコンテキスト管理) を推奨します。理由:

  1. ✅ 明示的なコンテキスト管理
  2. ✅ 後方互換性の保持
  3. ✅ エラーハンドリングの強化
  4. ✅ ログによるデバッグのサポート
  5. ✅ 非同期処理に最適化

加えて、修正案3 (_PassthroughTracerの改善) も併せて適用することで、 より堅牢なトレーシング実装を実現できます。

Google ADK OpenInferenceインスツルメンテーション 親スパン問題の調査レポート

問題の概要

openinference-instrumentation-google-adkにおいて、invocationが常にセッションのトップになり、親子関係が正しく設定されないという問題が報告されています。

調査結果

ソースコード分析

1. 主要なラッパークラス

ファイル: openinference/instrumentation/google_adk/_wrappers.py

以下の3つの主要なラッパークラスが存在します:

  1. _RunnerRunAsync (77-143行目)

    • Runner.run_asyncメソッドをラップ
    • invocationレベルのスパンを作成
    • スパン名: "invocation [{app_name}]"
    • スパン種類: CHAIN
  2. _BaseAgentRunAsync (146-189行目)

    • BaseAgent.run_asyncメソッドをラップ
    • agent_runレベルのスパンを作成
    • スパン名: "agent_run [{agent_name}]"
    • スパン種類: AGENT
  3. _TraceCallLlm (192-283行目)

    • LLM呼び出しをトレース
    • 新しいスパンを作成せず、現在のスパンに属性を追加
    • span = get_current_span()を使用
  4. _TraceToolCall (285-340行目)

    • ツール呼び出しをトレース
    • 新しいスパンを作成せず、現在のスパンに属性を追加
    • span = get_current_span()を使用

問題の原因

_RunnerRunAsyncの実装 (115-119行目)

span = stack.enter_context(
    tracer.start_as_current_span(
        name=name,
        attributes=attributes,
    )
)

この実装では、tracer.start_as_current_span()を呼び出していますが、既存の親スパンのコンテキストを明示的に考慮していません

OpenTelemetryのstart_as_current_span()は、デフォルトで現在のコンテキストから親スパンを自動的に取得しますが、以下の要因により親子関係が正しく設定されない可能性があります:

  1. コンテキストの伝播が途切れている

    • 非同期処理のコンテキスト伝播が適切に行われていない
    • _PassthroughTracerが介入してコンテキストを変更している
  2. _PassthroughTracerの影響 (init.py 177-187行目)

    class _PassthroughTracer(wrapt.ObjectProxy):
        @_agnosticcontextmanager
        def start_as_current_span(self, *args: Any, **kwargs: Any) -> Iterator[Span]:
            """Return the current span without creating a new one."""
            yield get_current_span()

    この実装は既存のADKトレーサーを無効化するために使われていますが、新しいスパンを作成せずに現在のスパンをそのまま返すため、スパンの階層構造が崩れる可能性があります。

スパン階層の期待と現実

期待される階層:

Session (root)
└── Invocation (session内)
    └── Agent Run
        ├── LLM Call
        └── Tool Call

現在の実装での階層 (推測):

Invocation (root - 親スパンを持たない)
└── Agent Run
    ├── LLM Call
    └── Tool Call

Session (別のトレースとして存在する可能性)

解決策の提案

Option 1: コンテキスト伝播の明示的な実装

_RunnerRunAsyncで親スパンのコンテキストを明示的に取得し、それを使用してスパンを作成する:

from opentelemetry import context as context_api

class _RunnerRunAsync(_WithTracer):
    def __call__(self, ...):
        # ...
        
        # 現在のコンテキストを取得
        current_context = context_api.get_current()
        
        # 親スパンが存在するかチェック
        parent_span = get_current_span()
        
        class _AsyncGenerator(wrapt.ObjectProxy):
            async def __aiter__(self) -> Any:
                with ExitStack() as stack:
                    # 親スパンのコンテキストを明示的に使用
                    if parent_span and parent_span.is_recording():
                        # 親スパンが存在する場合、そのコンテキストで新しいスパンを作成
                        span = stack.enter_context(
                            tracer.start_as_current_span(
                                name=name,
                                attributes=attributes,
                                context=current_context,  # 明示的にコンテキストを渡す
                            )
                        )
                    else:
                        # 親スパンが存在しない場合は通常通り
                        span = stack.enter_context(
                            tracer.start_as_current_span(
                                name=name,
                                attributes=attributes,
                            )
                        )
                    # ... 残りの実装

Option 2: start_spanを使用した明示的な親子関係の設定

start_as_current_spanの代わりにstart_spanを使用し、親スパンを明示的に指定する:

from opentelemetry.trace import SpanKind, Link

class _RunnerRunAsync(_WithTracer):
    def __call__(self, ...):
        # ...
        
        class _AsyncGenerator(wrapt.ObjectProxy):
            async def __aiter__(self) -> Any:
                with ExitStack() as stack:
                    # 親スパンを明示的に取得
                    parent_span_context = get_current_span().get_span_context()
                    
                    # 親スパンを指定してスパンを作成
                    span = stack.enter_context(
                        tracer.start_span(
                            name=name,
                            context=trace_api.set_span_in_context(parent_span_context),
                            attributes=attributes,
                        )
                    )
                    # スパンをアクティブにする
                    token = context_api.attach(trace_api.set_span_in_context(span))
                    stack.callback(context_api.detach, token)
                    # ... 残りの実装

Option 3: コンテキストマネージャーの改善

非同期ジェネレーター内でのコンテキスト管理を改善する:

from contextlib import asynccontextmanager

class _RunnerRunAsync(_WithTracer):
    def __call__(self, ...):
        # ...
        
        @asynccontextmanager
        async def create_invocation_span():
            # 現在のコンテキストを保存
            token = context_api.attach(context_api.get_current())
            try:
                with tracer.start_as_current_span(
                    name=name,
                    attributes=attributes,
                ) as span:
                    yield span
            finally:
                context_api.detach(token)
        
        class _AsyncGenerator(wrapt.ObjectProxy):
            async def __aiter__(self) -> Any:
                async with create_invocation_span() as span:
                    # ... 残りの実装

Option 4: _PassthroughTracerの修正

_PassthroughTracerが親スパンのコンテキストを正しく伝播するように修正:

class _PassthroughTracer(wrapt.ObjectProxy):
    """A tracer proxy that passes through span operations without creating new spans."""
    
    @_agnosticcontextmanager
    def start_as_current_span(self, *args: Any, **kwargs: Any) -> Iterator[Span]:
        """Return the current span without creating a new one, preserving context."""
        current_span = get_current_span()
        
        # 現在のコンテキストを保持
        token = context_api.attach(
            trace_api.set_span_in_context(current_span)
        )
        try:
            yield current_span
        finally:
            context_api.detach(token)

推奨される対応

  1. 短期的な対応: Option 1を採用し、コンテキスト伝播を明示的に実装する
  2. 中期的な対応: Option 4と併せて、_PassthroughTracerの動作を改善する
  3. 長期的な対応: ADK自体のトレーシング機能との統合を見直し、二重インスツルメンテーションを避ける

検証方法

修正後、以下の点を確認する必要があります:

  1. 親子関係の確認

    from opentelemetry import trace
    
    # トレースを確認
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("session"):
        # ここでADKのrunを実行
        # invocationスパンがsessionスパンの子になっているか確認
  2. スパン階層の可視化

    • Phoenix、Jaeger、またはZipkinを使用してトレースを可視化
    • invocationがsessionの子スパンとして表示されることを確認
  3. マルチエージェントシナリオのテスト

    • 複数のエージェントを使用した場合の親子関係を確認
    • AgentToolを使用した場合のコンテキスト伝播を確認

関連Issue

  • Google ADK Python Issue #1746: Custom Invocation Context propagation when using AgentTool
    • AgentToolにおけるInvocationContextの伝播に関する問題
    • 本問題と関連性あり

参考資料

まとめ

openinference-instrumentation-google-adkの親スパン問題は、主に以下の要因によって発生しています:

  1. _RunnerRunAsyncが親スパンのコンテキストを明示的に考慮していない
  2. _PassthroughTracerがコンテキスト伝播を阻害している可能性がある
  3. 非同期処理におけるコンテキスト管理が不十分

これらの問題を解決するには、OpenTelemetryのコンテキスト伝播メカニズムを正しく実装し、親スパンのコンテキストを明示的に保持・伝播する必要があります。

--- a/openinference/instrumentation/google_adk/_wrappers.py
+++ b/openinference/instrumentation/google_adk/_wrappers.py
@@ -1,6 +1,7 @@
import base64
import inspect
import json
import logging
+from contextlib import asynccontextmanager
from abc import ABC
from contextlib import ExitStack
from typing import (
@@ -108,16 +109,51 @@ class _RunnerRunAsync(_WithTracer):
if (session_id := kwargs.get("session_id")) is not None:
attributes[SpanAttributes.SESSION_ID] = session_id
+ # Context-aware span creation helper
+ @asynccontextmanager
+ async def managed_invocation_span():
+ """Create invocation span with proper parent context propagation."""
+ # Get current context and parent span
+ current_context = context_api.get_current()
+ parent_span = get_current_span()
+
+ # Check if parent span is valid and recording
+ parent_is_valid = (
+ parent_span
+ and parent_span.get_span_context().is_valid
+ and parent_span.is_recording()
+ )
+
+ if parent_is_valid:
+ logger.debug(
+ f"Creating invocation [{instance.app_name}] as child of "
+ f"trace_id={parent_span.get_span_context().trace_id}"
+ )
+ else:
+ logger.debug(
+ f"Creating invocation [{instance.app_name}] as root span"
+ )
+
+ # Create span with explicit context
+ with tracer.start_as_current_span(
+ name=name,
+ attributes=attributes,
+ context=current_context, # Explicitly pass parent context
+ ) as span:
+ try:
+ yield span
+ except Exception as e:
+ span.set_status(StatusCode.ERROR, str(e))
+ span.record_exception(e)
+ raise
+
class _AsyncGenerator(wrapt.ObjectProxy): # type: ignore[misc]
__wrapped__: AsyncGenerator[Event, None]
async def __aiter__(self) -> Any:
- with ExitStack() as stack:
- span = stack.enter_context(
- tracer.start_as_current_span(
- name=name,
- attributes=attributes,
- )
- )
+ async with managed_invocation_span() as span:
+ with ExitStack() as stack:
if user_id is not None:
stack.enter_context(using_user(user_id))
if session_id is not None:
@@ -139,7 +175,6 @@ class _RunnerRunAsync(_WithTracer):
f"Failed to get attribute: {SpanAttributes.OUTPUT_VALUE}."
)
yield event
- span.set_status(StatusCode.OK)
return _AsyncGenerator(generator)
--- a/openinference/instrumentation/google_adk/__init__.py
+++ b/openinference/instrumentation/google_adk/__init__.py
@@ -177,11 +177,30 @@ class GoogleADKInstrumentor(BaseInstrumentor): # type: ignore
class _PassthroughTracer(wrapt.ObjectProxy): # type: ignore[misc]
"""A tracer proxy that passes through span operations without creating new spans.
This is used to disable existing tracers during instrumentation to prevent
double-instrumentation of the same operations.
"""
@_agnosticcontextmanager
def start_as_current_span(self, *args: Any, **kwargs: Any) -> Iterator[Span]:
- """Return the current span without creating a new one."""
- yield get_current_span()
+ """Return the current span without creating a new one.
+
+ This method preserves the context propagation by properly managing
+ the OpenTelemetry context stack.
+ """
+ current_span = get_current_span()
+
+ # Save current context as a token
+ token = context_api.attach(
+ trace_api.set_span_in_context(current_span)
+ )
+
+ try:
+ # Log for debugging
+ if current_span.is_recording():
+ logger.debug(
+ f"_PassthroughTracer: Passing through span "
+ f"{current_span.get_span_context().span_id}"
+ )
+ yield current_span
+ finally:
+ # Restore context
+ context_api.detach(token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment