feat(otel): add minimal generate_content {model.name} spans and logs for non-gemini inference and when opentelemetry-inference-google-genai dependency is missing

Co-authored-by: Max Ind <maxind@google.com>
PiperOrigin-RevId: 859667045
This commit is contained in:
Max Ind
2026-01-22 10:17:39 -08:00
committed by Copybara-Service
parent 82fa10b71e
commit 935c279f82
5 changed files with 420 additions and 21 deletions
+1
View File
@@ -126,6 +126,7 @@ test = [
"litellm>=1.75.5, <1.80.17", # For LiteLLM tests
"llama-index-readers-file>=0.4.0", # For retrieval tests
"openai>=1.100.2", # For LiteLLM
"opentelemetry-instrumentation-google-genai>=0.3b0, <1.0.0",
"pytest-asyncio>=0.25.0",
"pytest-mock>=3.14.0",
"pytest-xdist>=3.6.1",
@@ -41,6 +41,7 @@ from ...events.event import Event
from ...models.base_llm_connection import BaseLlmConnection
from ...models.llm_request import LlmRequest
from ...models.llm_response import LlmResponse
from ...telemetry import tracing
from ...telemetry.tracing import trace_call_llm
from ...telemetry.tracing import trace_send_data
from ...telemetry.tracing import tracer
@@ -771,7 +772,7 @@ class BaseLlmFlow(ABC):
llm = self.__get_llm(invocation_context)
async def _call_llm_with_tracing() -> AsyncGenerator[LlmResponse, None]:
with tracer.start_as_current_span('call_llm'):
with tracer.start_as_current_span('call_llm') as span:
if invocation_context.run_config.support_cfc:
invocation_context.live_request_queue = LiveRequestQueue()
responses_generator = self.run_live(invocation_context)
@@ -822,6 +823,7 @@ class BaseLlmFlow(ABC):
model_response_event.id,
llm_request,
llm_response,
span,
)
# Runs after_model_callback if it exists.
if altered_llm_response := await self._handle_after_model_callback(
@@ -1050,8 +1052,12 @@ class BaseLlmFlow(ABC):
try:
async with Aclosing(response_generator) as agen:
async for response in agen:
yield response
with tracing.use_generate_content_span(
llm_request, invocation_context, model_response_event
) as span:
async for llm_response in agen:
tracing.trace_generate_content_result(span, llm_response)
yield llm_response
except Exception as model_error:
callback_context = CallbackContext(
invocation_context, event_actions=model_response_event.actions
+211 -15
View File
@@ -23,35 +23,59 @@
from __future__ import annotations
from collections.abc import Iterator
from collections.abc import Mapping
from contextlib import contextmanager
import json
import logging
import os
from typing import Any
from typing import Optional
from typing import TYPE_CHECKING
from google.genai import types
from google.genai.models import Models
from opentelemetry import _logs
from opentelemetry import trace
from opentelemetry._logs import LogRecord
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_AGENT_DESCRIPTION
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_AGENT_NAME
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_CONVERSATION_ID
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_OPERATION_NAME
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_REQUEST_MODEL
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_RESPONSE_FINISH_REASONS
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_SYSTEM
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_CALL_ID
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_DESCRIPTION
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_NAME
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_TOOL_TYPE
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_USAGE_INPUT_TOKENS
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_USAGE_OUTPUT_TOKENS
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GenAiSystemValues
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import Span
from opentelemetry.util.types import AnyValue
from opentelemetry.util.types import AttributeValue
from pydantic import BaseModel
from .. import version
from ..events.event import Event
from ..utils.model_name_utils import is_gemini_model
# By default some ADK spans include attributes with potential PII data.
# This env, when set to false, allows to disable populating those attributes.
ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS = 'ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS'
# TODO: Replace with constant from opentelemetry.semconv when it reaches version 1.37 in g3.
GEN_AI_AGENT_DESCRIPTION = 'gen_ai.agent.description'
GEN_AI_AGENT_NAME = 'gen_ai.agent.name'
GEN_AI_CONVERSATION_ID = 'gen_ai.conversation.id'
GEN_AI_OPERATION_NAME = 'gen_ai.operation.name'
GEN_AI_TOOL_CALL_ID = 'gen_ai.tool.call.id'
GEN_AI_TOOL_DESCRIPTION = 'gen_ai.tool.description'
GEN_AI_TOOL_NAME = 'gen_ai.tool.name'
GEN_AI_TOOL_TYPE = 'gen_ai.tool.type'
# Standard OTEL env variable to enable logging of prompt/response content.
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = (
'OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT'
)
USER_CONTENT_ELIDED = '<elided>'
# Needed to avoid circular imports
if TYPE_CHECKING:
from ..agents.base_agent import BaseAgent
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from ..models.llm_request import LlmRequest
from ..models.llm_response import LlmResponse
from ..tools.base_tool import BaseTool
@@ -59,10 +83,17 @@ if TYPE_CHECKING:
tracer = trace.get_tracer(
instrumenting_module_name='gcp.vertex.agent',
instrumenting_library_version=version.__version__,
# TODO: Replace with constant from opentelemetry.semconv when it reaches version 1.37 in g3.
schema_url='https://opentelemetry.io/schemas/1.37.0',
schema_url=Schemas.V1_36_0.value,
)
otel_logger = _logs.get_logger(
instrumenting_module_name='gcp.vertex.agent',
instrumenting_library_version=version.__version__,
schema_url=Schemas.V1_36_0.value,
)
logger = logging.getLogger('google_adk.' + __name__)
def _safe_json_serialize(obj) -> str:
"""Convert any Python object to a JSON-serializable type or string.
@@ -119,7 +150,7 @@ def trace_agent_invocation(
def trace_tool_call(
tool: BaseTool,
args: dict[str, Any],
function_response_event: Optional[Event],
function_response_event: Event | None,
):
"""Traces tool call.
@@ -234,6 +265,7 @@ def trace_call_llm(
event_id: str,
llm_request: LlmRequest,
llm_response: LlmResponse,
span: Span | None = None,
):
"""Traces a call to the LLM.
@@ -246,7 +278,7 @@ def trace_call_llm(
llm_request: The LLM request object.
llm_response: The LLM response object.
"""
span = trace.get_current_span()
span = span or trace.get_current_span()
# Special standard Open Telemetry GenaI attributes that indicate
# that this is a span related to a Generative AI system.
span.set_attribute('gen_ai.system', 'gcp.vertex.agent')
@@ -390,3 +422,167 @@ def _should_add_request_response_to_spans() -> bool:
ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS, 'true'
).lower() in ('false', '0')
return not disabled_via_env_var
@contextmanager
def use_generate_content_span(
llm_request: LlmRequest,
invocation_context: InvocationContext,
model_response_event: Event,
) -> Iterator[Span | None]:
"""Context manager encompassing `generate_content {model.name}` span.
When an external library for inference instrumentation is installed (e.g. opentelemetry-instrumentation-google-genai),
span creation is delegated to said library.
"""
common_attributes = {
GEN_AI_CONVERSATION_ID: invocation_context.session.id,
'gcp.vertex.agent.event_id': model_response_event.id,
}
if (
_is_gemini_agent(invocation_context.agent)
and _instrumented_with_opentelemetry_instrumentation_google_genai()
):
yield None
else:
with _use_native_generate_content_span(
llm_request=llm_request,
common_attributes=common_attributes,
) as span:
yield span
def _should_log_prompt_response_content() -> bool:
return os.getenv(
OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT, ''
).lower() in ('1', 'true')
def _serialize_content(content: types.ContentUnion) -> AnyValue:
if isinstance(content, BaseModel):
return content.model_dump()
if isinstance(content, str):
return content
if isinstance(content, list):
return [_serialize_content(part) for part in content]
return _safe_json_serialize(content)
def _serialize_content_with_elision(
content: types.ContentUnion | None,
) -> AnyValue:
if not _should_log_prompt_response_content():
return USER_CONTENT_ELIDED
if content is None:
return None
return _serialize_content(content)
def _instrumented_with_opentelemetry_instrumentation_google_genai() -> bool:
maybe_wrapped_function = Models.generate_content
print(f'{Models.generate_content.__code__.co_filename=}')
while wrapped := getattr(maybe_wrapped_function, '__wrapped__', None):
if (
'opentelemetry/instrumentation/google_genai'
in maybe_wrapped_function.__code__.co_filename
):
return True
maybe_wrapped_function = wrapped # pyright: ignore[reportAny]
return False
def _is_gemini_agent(agent: BaseAgent) -> bool:
from ..agents.llm_agent import LlmAgent
if not isinstance(agent, LlmAgent):
return False
if isinstance(agent.model, str):
return is_gemini_model(agent.model)
from ..models.google_llm import Gemini
return isinstance(agent.model, Gemini)
@contextmanager
def _use_native_generate_content_span(
llm_request: LlmRequest,
common_attributes: Mapping[str, AttributeValue],
) -> Iterator[Span]:
with tracer.start_as_current_span(
f"generate_content {llm_request.model or ''}"
) as span:
span.set_attribute(GEN_AI_SYSTEM, _guess_gemini_system_name())
span.set_attribute(GEN_AI_OPERATION_NAME, 'generate_content')
span.set_attribute(GEN_AI_REQUEST_MODEL, llm_request.model or '')
span.set_attributes(common_attributes)
otel_logger.emit(
LogRecord(
event_name='gen_ai.system.message',
body={
'content': _serialize_content_with_elision(
llm_request.config.system_instruction
)
},
attributes={GEN_AI_SYSTEM: _guess_gemini_system_name()},
)
)
for content in llm_request.contents:
otel_logger.emit(
LogRecord(
event_name='gen_ai.user.message',
body={'content': _serialize_content_with_elision(content)},
attributes={GEN_AI_SYSTEM: _guess_gemini_system_name()},
)
)
yield span
def trace_generate_content_result(span: Span | None, llm_response: LlmResponse):
"""Trace result of the inference in generate_content span."""
if span is None:
return
if llm_response.partial:
return
if finish_reason := llm_response.finish_reason:
span.set_attribute(GEN_AI_RESPONSE_FINISH_REASONS, [finish_reason.lower()])
if usage_metadata := llm_response.usage_metadata:
if usage_metadata.prompt_token_count is not None:
span.set_attribute(
GEN_AI_USAGE_INPUT_TOKENS, usage_metadata.prompt_token_count
)
if usage_metadata.candidates_token_count is not None:
span.set_attribute(
GEN_AI_USAGE_OUTPUT_TOKENS, usage_metadata.candidates_token_count
)
otel_logger.emit(
LogRecord(
event_name='gen_ai.choice',
body={
'content': _serialize_content_with_elision(llm_response.content),
'index': 0, # ADK always returns a single candidate
}
| {'finish_reason': llm_response.finish_reason.value}
if llm_response.finish_reason is not None
else {},
attributes={GEN_AI_SYSTEM: _guess_gemini_system_name()},
)
)
def _guess_gemini_system_name() -> str:
return (
GenAiSystemValues.VERTEX_AI.name.lower()
if os.getenv('GOOGLE_GENAI_USE_VERTEXAI', '').lower() in ('true', '1')
else GenAiSystemValues.GEMINI.name.lower()
)
+50 -3
View File
@@ -12,19 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import gc
import sys
from google.adk.agents import base_agent
from google.adk.agents.llm_agent import Agent
from google.adk.models.base_llm import BaseLlm
from google.adk.models.llm_response import LlmResponse
from google.adk.telemetry import tracing
from google.adk.tools import FunctionTool
from google.adk.utils.context_utils import Aclosing
from google.genai.types import Content
from google.genai.types import Part
from opentelemetry.instrumentation.google_genai import GoogleGenAiSdkInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
@@ -121,6 +119,8 @@ async def test_tracer_start_as_current_span(
'call_llm',
'call_llm',
'execute_tool some_tool',
'generate_content mock',
'generate_content mock',
'invocation',
'invoke_agent some_root_agent',
]
@@ -162,3 +162,50 @@ async def test_exception_preserves_attributes(
for span in spans
if span.name != 'invocation' # not expected to have attributes
)
@pytest.mark.asyncio
async def test_no_generate_content_for_gemini_model_when_already_instrumented(
test_runner: TestInMemoryRunner,
span_exporter: InMemorySpanExporter,
monkeypatch: pytest.MonkeyPatch,
):
"""Tests"""
# Arrange
monkeypatch.setattr(
tracing,
'_instrumented_with_opentelemetry_instrumentation_google_genai',
lambda: True,
)
monkeypatch.setattr(
tracing,
'_is_gemini_agent',
lambda _: True,
)
# Act
async with Aclosing(test_runner.run_async_with_new_session_agen('')) as agen:
async for _ in agen:
pass
# Assert
spans = span_exporter.get_finished_spans()
assert not any(span.name.startswith('generate_content') for span in spans)
def test_instrumented_with_opentelemetry_instrumentation_google_genai():
instrumentor = GoogleGenAiSdkInstrumentor()
assert (
not tracing._instrumented_with_opentelemetry_instrumentation_google_genai()
)
try:
instrumentor.instrument()
assert (
tracing._instrumented_with_opentelemetry_instrumentation_google_genai()
)
finally:
instrumentor.uninstrument()
assert (
not tracing._instrumented_with_opentelemetry_instrumentation_google_genai()
)
+149
View File
@@ -26,11 +26,21 @@ from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.telemetry.tracing import ADK_CAPTURE_MESSAGE_CONTENT_IN_SPANS
from google.adk.telemetry.tracing import trace_agent_invocation
from google.adk.telemetry.tracing import trace_call_llm
from google.adk.telemetry.tracing import trace_generate_content_result
from google.adk.telemetry.tracing import trace_merged_tool_calls
from google.adk.telemetry.tracing import trace_send_data
from google.adk.telemetry.tracing import trace_tool_call
from google.adk.telemetry.tracing import use_generate_content_span
from google.adk.tools.base_tool import BaseTool
from google.genai import types
from opentelemetry._logs import LogRecord
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_CONVERSATION_ID
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_OPERATION_NAME
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_REQUEST_MODEL
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_RESPONSE_FINISH_REASONS
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_SYSTEM
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_USAGE_INPUT_TOKENS
from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_USAGE_OUTPUT_TOKENS
import pytest
@@ -612,3 +622,142 @@ async def test_trace_send_data_disabling_request_response_content(
call_obj.args
for call_obj in mock_span_fixture.set_attribute.call_args_list
)
@pytest.mark.asyncio
@mock.patch('google.adk.telemetry.tracing.otel_logger')
@mock.patch('google.adk.telemetry.tracing.tracer')
@mock.patch(
'google.adk.telemetry.tracing._guess_gemini_system_name',
return_value='test_system',
)
@pytest.mark.parametrize('capture_content', [True, False])
async def test_generate_content_span(
mock_guess_system_name,
mock_tracer,
mock_otel_logger,
monkeypatch,
capture_content,
):
"""Test native generate_content span creation with attributes and logs."""
# Arrange
monkeypatch.setenv(
'OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT',
str(capture_content).lower(),
)
monkeypatch.setattr(
'google.adk.telemetry.tracing._instrumented_with_opentelemetry_instrumentation_google_genai',
lambda: False,
)
agent = LlmAgent(name='test_agent', model='not-a-gemini-model')
invocation_context = await _create_invocation_context(agent)
system_instruction = types.Content(
parts=[types.Part.from_text(text='You are a helpful assistant.')],
)
user_content1 = types.Content(role='user', parts=[types.Part(text='Hello')])
user_content2 = types.Content(role='user', parts=[types.Part(text='World')])
model_content = types.Content(
role='model', parts=[types.Part(text='Response')]
)
llm_request = LlmRequest(
model='some-model',
contents=[user_content1, user_content2],
config=types.GenerateContentConfig(system_instruction=system_instruction),
)
llm_response = LlmResponse(
content=model_content,
finish_reason=types.FinishReason.STOP,
usage_metadata=types.GenerateContentResponseUsageMetadata(
prompt_token_count=10,
candidates_token_count=20,
),
)
model_response_event = mock.MagicMock()
model_response_event.id = 'event-123'
mock_span = (
mock_tracer.start_as_current_span.return_value.__enter__.return_value
)
# Act
with use_generate_content_span(
llm_request, invocation_context, model_response_event
) as span:
assert span is mock_span
trace_generate_content_result(span, llm_response)
# Assert Span
mock_tracer.start_as_current_span.assert_called_once_with(
'generate_content some-model'
)
mock_span.set_attribute.assert_any_call(GEN_AI_SYSTEM, 'test_system')
mock_span.set_attribute.assert_any_call(
GEN_AI_OPERATION_NAME, 'generate_content'
)
mock_span.set_attribute.assert_any_call(GEN_AI_REQUEST_MODEL, 'some-model')
mock_span.set_attribute.assert_any_call(
GEN_AI_RESPONSE_FINISH_REASONS, ['stop']
)
mock_span.set_attribute.assert_any_call(GEN_AI_USAGE_INPUT_TOKENS, 10)
mock_span.set_attribute.assert_any_call(GEN_AI_USAGE_OUTPUT_TOKENS, 20)
mock_span.set_attributes.assert_called_once_with({
GEN_AI_CONVERSATION_ID: invocation_context.session.id,
'gcp.vertex.agent.event_id': 'event-123',
})
# Assert Logs
assert mock_otel_logger.emit.call_count == 4
expected_system_body = {
'content': (
system_instruction.model_dump() if capture_content else '<elided>'
)
}
expected_user1_body = {
'content': user_content1.model_dump() if capture_content else '<elided>'
}
expected_user2_body = {
'content': user_content2.model_dump() if capture_content else '<elided>'
}
expected_choice_body = {
'content': model_content.model_dump() if capture_content else '<elided>',
'index': 0,
'finish_reason': 'STOP',
}
log_records: list[LogRecord] = [
call.args[0] for call in mock_otel_logger.emit.call_args_list
]
system_log = next(
(lr for lr in log_records if lr.event_name == 'gen_ai.system.message'),
None,
)
assert system_log is not None
assert system_log.body == expected_system_body
assert system_log.attributes == {GEN_AI_SYSTEM: 'test_system'}
user_logs = [
lr for lr in log_records if lr.event_name == 'gen_ai.user.message'
]
assert len(user_logs) == 2
assert expected_user1_body == user_logs[0].body
assert expected_user2_body == user_logs[1].body
for log in user_logs:
assert log.attributes == {GEN_AI_SYSTEM: 'test_system'}
choice_log = next(
(lr for lr in log_records if lr.event_name == 'gen_ai.choice'),
None,
)
assert choice_log is not None
assert choice_log.body == expected_choice_body
assert choice_log.attributes == {GEN_AI_SYSTEM: 'test_system'}