feat(otel): add --otel_to_cloud experimental support

Cloud Trace, Cloud Monitoring and Cloud Logging integrations are set up via OTel if otel_to_cloud CLI param/fast_api arg is provided.

This is similar to current Cloud Trace integration via trace_to_cloud, just extended to Monitoring and Logging as well.

PiperOrigin-RevId: 807230668
This commit is contained in:
Kacper Jawoszek
2025-09-15 07:21:39 -07:00
committed by Copybara-Service
parent b9735b2193
commit 7870480c63
15 changed files with 430 additions and 30 deletions
+12 -2
View File
@@ -41,9 +41,12 @@ dependencies = [
"google-genai>=1.21.1, <2.0.0", # Google GenAI SDK
"graphviz>=0.20.2, <1.0.0", # Graphviz for graph rendering
"mcp>=1.8.0, <2.0.0;python_version>='3.10'", # For MCP Toolset
"opentelemetry-api>=1.31.0, <2.0.0", # OpenTelemetry
"opentelemetry-api>=1.31.0, <=1.37.0", # OpenTelemetry - limit upper version for sdk and api to not risk breaking changes from unstable _logs package.
"opentelemetry-exporter-gcp-logging>=1.9.0a0, <2.0.0",
"opentelemetry-exporter-gcp-monitoring>=1.9.0a0, <2.0.0",
"opentelemetry-exporter-gcp-trace>=1.9.0, <2.0.0",
"opentelemetry-sdk>=1.31.0, <2.0.0",
"opentelemetry-resourcedetector-gcp>=1.9.0a0, <2.0.0",
"opentelemetry-sdk>=1.31.0, <=1.37.0",
"pydantic>=2.0, <3.0.0", # For data validation/models
"python-dateutil>=2.9.0.post0, <3.0.0", # For Vertext AI Session Service
"python-dotenv>=1.0.0, <2.0.0", # To manage environment variables
@@ -147,6 +150,13 @@ extensions = [
"toolbox-core>=0.1.0", # For tools.toolbox_toolset.ToolboxToolset
]
otel-gcp = [
"opentelemetry-exporter-gcp-logging>=1.9.0a0, <2.0.0",
"opentelemetry-exporter-gcp-monitoring>=1.9.0a0, <2.0.0",
"opentelemetry-exporter-gcp-trace>=1.9.0, <2.0.0",
"opentelemetry-resourcedetector-gcp>=1.9.0a0, <2.0.0",
]
[tool.pyink]
# Format py files following Google style-guide
+68 -8
View File
@@ -41,6 +41,7 @@ import graphviz
from opentelemetry import trace
from opentelemetry.sdk.trace import export as export_lib
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace import TracerProvider
from pydantic import Field
from pydantic import ValidationError
@@ -257,6 +258,59 @@ class ListMetricsInfoResponse(common.BaseModel):
metrics_info: list[MetricInfo]
def _setup_telemetry(
otel_to_cloud: bool = False,
internal_exporters: Optional[list[SpanProcessor]] = None,
):
# TODO - remove the condition and else branch here once
# maybe_set_otel_providers is no longer experimental.
if otel_to_cloud:
_setup_telemetry_experimental(
otel_to_cloud=otel_to_cloud, internal_exporters=internal_exporters
)
else:
# Old logic - to be removed when above leaves experimental.
tracer_provider = TracerProvider()
for exporter in internal_exporters:
tracer_provider.add_span_processor(exporter)
trace.set_tracer_provider(tracer_provider=tracer_provider)
def _setup_telemetry_experimental(
otel_to_cloud: bool = False,
internal_exporters: list[SpanProcessor] = None,
):
from ..telemetry.setup import maybe_set_otel_providers
otel_hooks_to_add = []
otel_resource = None
if internal_exporters:
from ..telemetry.setup import OTelHooks
# Register ADK-specific exporters in trace provider.
otel_hooks_to_add.append(OTelHooks(span_processors=internal_exporters))
if otel_to_cloud:
from ..telemetry.google_cloud import get_gcp_exporters
from ..telemetry.google_cloud import get_gcp_resource
otel_hooks_to_add.append(
get_gcp_exporters(
# TODO - use trace_to_cloud here as well once otel_to_cloud is no
# longer experimental.
enable_cloud_tracing=True,
enable_cloud_metrics=True,
enable_cloud_logging=True,
)
)
otel_resource = get_gcp_resource()
maybe_set_otel_providers(
otel_hooks_to_setup=otel_hooks_to_add, otel_resource=otel_resource
)
class AdkWebServer:
"""Helper class for setting up and running the ADK web server on FastAPI.
@@ -355,6 +409,7 @@ class AdkWebServer:
[Observer, "AdkWebServer"], None
] = lambda o, s: None,
register_processors: Callable[[TracerProvider], None] = lambda o: None,
otel_to_cloud: bool = False,
):
"""Creates a FastAPI app for the ADK web server.
@@ -371,6 +426,8 @@ class AdkWebServer:
tear_down_observer: Callback for cleaning up the file system observer.
register_processors: Callback for additional Span processors to be added
to the TracerProvider.
otel_to_cloud: EXPERIMENTAL. Whether to enable Cloud Trace,
Cloud Monitoring and Cloud Logging integrations.
Returns:
A FastAPI app instance.
@@ -395,17 +452,20 @@ class AdkWebServer:
# Create tasks for all runner closures to run concurrently
await cleanup.close_runners(list(self.runner_dict.values()))
# Set up tracing in the FastAPI server.
provider = TracerProvider()
provider.add_span_processor(
export_lib.SimpleSpanProcessor(ApiServerSpanExporter(trace_dict))
)
memory_exporter = InMemoryExporter(session_trace_dict)
provider.add_span_processor(export_lib.SimpleSpanProcessor(memory_exporter))
register_processors(provider)
_setup_telemetry(
otel_to_cloud=otel_to_cloud,
internal_exporters=[
export_lib.SimpleSpanProcessor(ApiServerSpanExporter(trace_dict)),
export_lib.SimpleSpanProcessor(memory_exporter),
],
)
trace.set_tracer_provider(provider)
# TODO - register_processors to be removed once --otel_to_cloud is no
# longer experimental.
tracer_provider = trace.get_tracer_provider()
register_processors(tracer_provider)
# Run the FastAPI server.
app = FastAPI(lifespan=internal_lifespan)
+15
View File
@@ -655,6 +655,17 @@ def fast_api_common_options():
default=False,
help="Optional. Whether to enable cloud trace for telemetry.",
)
@click.option(
"--otel_to_cloud",
is_flag=True,
show_default=True,
default=False,
help=(
"EXPERIMENTAL Optional. Whether to write OTel data to Google Cloud"
" Observability services - Cloud Trace, Cloud Monitoring and Cloud"
" Logging."
),
)
@click.option(
"--reload/--no-reload",
default=True,
@@ -723,6 +734,7 @@ def cli_web(
host: str = "127.0.0.1",
port: int = 8000,
trace_to_cloud: bool = False,
otel_to_cloud: bool = False,
reload: bool = True,
session_service_uri: Optional[str] = None,
artifact_service_uri: Optional[str] = None,
@@ -776,6 +788,7 @@ def cli_web(
allow_origins=allow_origins,
web=True,
trace_to_cloud=trace_to_cloud,
otel_to_cloud=otel_to_cloud,
lifespan=_lifespan,
a2a=a2a,
host=host,
@@ -814,6 +827,7 @@ def cli_api_server(
host: str = "127.0.0.1",
port: int = 8000,
trace_to_cloud: bool = False,
otel_to_cloud: bool = False,
reload: bool = True,
session_service_uri: Optional[str] = None,
artifact_service_uri: Optional[str] = None,
@@ -846,6 +860,7 @@ def cli_api_server(
allow_origins=allow_origins,
web=False,
trace_to_cloud=trace_to_cloud,
otel_to_cloud=otel_to_cloud,
a2a=a2a,
host=host,
port=port,
+5 -1
View File
@@ -67,6 +67,7 @@ def get_fast_api_app(
host: str = "127.0.0.1",
port: int = 8000,
trace_to_cloud: bool = False,
otel_to_cloud: bool = False,
reload_agents: bool = False,
lifespan: Optional[Lifespan[FastAPI]] = None,
) -> FastAPI:
@@ -191,7 +192,9 @@ def get_fast_api_app(
# Callbacks & other optional args for when constructing the FastAPI instance
extra_fast_api_args = {}
if trace_to_cloud:
# TODO - Remove separate trace_to_cloud logic once otel_to_cloud stops being
# EXPERIMENTAL.
if trace_to_cloud and not otel_to_cloud:
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
def register_processors(provider: TracerProvider) -> None:
@@ -241,6 +244,7 @@ def get_fast_api_app(
app = adk_web_server.get_fast_api_app(
lifespan=lifespan,
allow_origins=allow_origins,
otel_to_cloud=otel_to_cloud,
**extra_fast_api_args,
)
@@ -41,9 +41,9 @@ from ...events.event import Event
from ...models.base_llm_connection import BaseLlmConnection
from ...models.llm_request import LlmRequest
from ...models.llm_response import LlmResponse
from ...telemetry import trace_call_llm
from ...telemetry import trace_send_data
from ...telemetry import tracer
from ...telemetry.tracing import trace_call_llm
from ...telemetry.tracing import trace_send_data
from ...telemetry.tracing import tracer
from ...tools.base_toolset import BaseToolset
from ...tools.tool_context import ToolContext
from ...utils.context_utils import Aclosing
+3 -3
View File
@@ -35,9 +35,9 @@ from ...agents.invocation_context import InvocationContext
from ...auth.auth_tool import AuthToolArguments
from ...events.event import Event
from ...events.event_actions import EventActions
from ...telemetry import trace_merged_tool_calls
from ...telemetry import trace_tool_call
from ...telemetry import tracer
from ...telemetry.tracing import trace_merged_tool_calls
from ...telemetry.tracing import trace_tool_call
from ...telemetry.tracing import tracer
from ...tools.base_tool import BaseTool
from ...tools.tool_confirmation import ToolConfirmation
from ...tools.tool_context import ToolContext
+1 -1
View File
@@ -51,7 +51,7 @@ from .plugins.plugin_manager import PluginManager
from .sessions.base_session_service import BaseSessionService
from .sessions.in_memory_session_service import InMemorySessionService
from .sessions.session import Session
from .telemetry import tracer
from .telemetry.tracing import tracer
from .tools.base_toolset import BaseToolset
from .utils.context_utils import Aclosing
+27
View File
@@ -0,0 +1,27 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .tracing import trace_call_llm
from .tracing import trace_merged_tool_calls
from .tracing import trace_send_data
from .tracing import trace_tool_call
from .tracing import tracer
__all__ = [
'trace_call_llm',
'trace_merged_tool_calls',
'trace_send_data',
'trace_tool_call',
'tracer',
]
+115
View File
@@ -0,0 +1,115 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import google.auth
from opentelemetry.resourcedetector.gcp_resource_detector import GoogleCloudResourceDetector
from opentelemetry.sdk._logs import LogRecordProcessor
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics.export import MetricReader
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import OTELResourceDetector
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from ..utils.feature_decorator import experimental
from .setup import OTelHooks
logger = logging.getLogger('google_adk.' + __name__)
@experimental
def get_gcp_exporters(
enable_cloud_tracing: bool = False,
enable_cloud_metrics: bool = False,
enable_cloud_logging: bool = False,
) -> OTelHooks:
"""Returns GCP OTel exporters to be used in the app.
Args:
enable_tracing: whether to enable tracing to Cloud Trace.
enable_metrics: whether to enable raporting metrics to Cloud Monitoring.
enable_logging: whether to enable sending logs to Cloud Logging.
"""
_, project_id = google.auth.default()
if not project_id:
logger.warning(
'Cannot determine GCP Project. OTel GCP Exporters cannot be set up.'
' Please make sure to log into correct GCP Project.'
)
return OTelHooks()
span_processors = []
if enable_cloud_tracing:
exporter = _get_gcp_span_exporter(project_id)
span_processors.append(exporter)
metric_readers = []
if enable_cloud_metrics:
exporter = _get_gcp_metrics_exporter(project_id)
if exporter:
metric_readers.append(exporter)
log_record_processors = []
if enable_cloud_logging:
exporter = _get_gcp_logs_exporter(project_id)
if exporter:
log_record_processors.append(exporter)
return OTelHooks(
span_processors=span_processors,
metric_readers=metric_readers,
log_record_processors=log_record_processors,
)
def _get_gcp_span_exporter(project_id: str) -> SpanProcessor:
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
return BatchSpanProcessor(CloudTraceSpanExporter(project_id=project_id))
def _get_gcp_metrics_exporter(project_id: str) -> MetricReader:
from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter
return PeriodicExportingMetricReader(
CloudMonitoringMetricsExporter(project_id=project_id),
export_interval_millis=5000,
)
def _get_gcp_logs_exporter(project_id: str) -> LogRecordProcessor:
from opentelemetry.exporter.cloud_logging import CloudLoggingExporter
return BatchLogRecordProcessor(
# TODO(jawoszek) - add default_log_name once design is approved.
CloudLoggingExporter(project_id=project_id)
)
def get_gcp_resource() -> Resource:
# The OTELResourceDetector populates resource labels from
# environment variables like OTEL_SERVICE_NAME and OTEL_RESOURCE_ATTRIBUTES.
# Then the GCP detector adds attributes corresponding to a correct
# monitored resource if ADK runs on one of supported platforms
# (e.g. GCE, GKE, CloudRun).
return (
OTELResourceDetector()
.detect()
.merge(GoogleCloudResourceDetector(raise_on_error=False).detect())
)
+112
View File
@@ -0,0 +1,112 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from typing import Optional
from opentelemetry import _logs
from opentelemetry import metrics
from opentelemetry import trace
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs import LogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import MetricReader
from opentelemetry.sdk.resources import OTELResourceDetector
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace import TracerProvider
from ..utils.feature_decorator import experimental
@dataclass
class OTelHooks:
span_processors: list[SpanProcessor] = field(default_factory=list)
metric_readers: list[MetricReader] = field(default_factory=list)
log_record_processors: list[LogRecordProcessor] = field(default_factory=list)
@experimental()
def maybe_set_otel_providers(
otel_hooks_to_setup: list[OTelHooks] = None,
otel_resource: Optional[Resource] = None,
):
"""Sets up OTel providers if hooks for a given telemetry type were
passed.
If a provider for a specific telemetry type was already globally set -
this function will not override it or register more exporters.
Args:
otel_hooks_to_setup: per-telemetry-type processors and readers to be added
to OTel providers. If no hooks for a specific telemetry type are passed -
provider will not be set.
otel_resource: OTel resource to use in providers.
If empty - default OTel resource detection will be used.
"""
if otel_hooks_to_setup is None:
otel_hooks_to_setup = []
if otel_resource is None:
otel_resource = _get_otel_resource()
span_processors = []
metric_readers = []
log_record_processors = []
for otel_hooks in otel_hooks_to_setup:
for span_processor in otel_hooks.span_processors:
span_processors.append(span_processor)
for metric_reader in otel_hooks.metric_readers:
metric_readers.append(metric_reader)
for log_record_processor in otel_hooks.log_record_processors:
log_record_processors.append(log_record_processor)
# Try to set up OTel tracing.
# If the TracerProvider was already set outside of ADK, this would be a no-op
# and results in a warning. In such case we rely on user setup.
if span_processors:
new_tracer_provider = TracerProvider(resource=otel_resource)
for exporter in span_processors:
new_tracer_provider.add_span_processor(exporter)
trace.set_tracer_provider(new_tracer_provider)
# Try to set up OTel metrics.
# If the MeterProvider was already set outside of ADK, this would be a no-op
# and results in a warning. In such case we rely on user setup.
if metric_readers:
metrics.set_meter_provider(
MeterProvider(
metric_readers=metric_readers,
resource=otel_resource,
)
)
# Try to set up OTel logging.
# If the LoggerProvider was already set outside of ADK, this would be a no-op
# and results in a warning. In such case we rely on user setup.
if log_record_processors:
new_logger_provider = LoggerProvider(
resource=otel_resource,
)
for exporter in log_record_processors:
new_logger_provider.add_log_record_processor(exporter)
_logs.set_logger_provider(new_logger_provider)
def _get_otel_resource() -> Resource:
# The OTELResourceDetector populates resource labels from
# environment variables like OTEL_SERVICE_NAME and OTEL_RESOURCE_ATTRIBUTES.
return OTELResourceDetector().detect()
@@ -29,11 +29,11 @@ from typing import Any
from google.genai import types
from opentelemetry import trace
from .agents.invocation_context import InvocationContext
from .events.event import Event
from .models.llm_request import LlmRequest
from .models.llm_response import LlmResponse
from .tools.base_tool import BaseTool
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from ..models.llm_request import LlmRequest
from ..models.llm_response import LlmResponse
from ..tools.base_tool import BaseTool
tracer = trace.get_tracer('gcp.vertex.agent')
@@ -17,10 +17,10 @@ from typing import Dict
from typing import Optional
from unittest import mock
from google.adk import telemetry
from google.adk.agents.llm_agent import Agent
from google.adk.events.event import Event
from google.adk.flows.llm_flows.functions import handle_function_calls_async
from google.adk.telemetry import tracing
from google.adk.tools.function_tool import FunctionTool
from google.genai import types
@@ -65,7 +65,7 @@ async def test_simple_function_with_mocked_tracer(monkeypatch):
mock_start_as_current_span_func.return_value = returned_context_manager_mock
monkeypatch.setattr(
telemetry.tracer, 'start_as_current_span', mock_start_as_current_span_func
tracing.tracer, 'start_as_current_span', mock_start_as_current_span_func
)
mock_adk_trace_tool_call = mock.Mock()
+2 -2
View File
@@ -16,10 +16,10 @@ import gc
import sys
from unittest import mock
from google.adk import telemetry
from google.adk.agents import base_agent
from google.adk.agents.llm_agent import Agent
from google.adk.models.base_llm import BaseLlm
from google.adk.telemetry import tracing
from google.adk.tools import FunctionTool
from google.adk.utils.context_utils import Aclosing
from google.genai.types import Part
@@ -74,7 +74,7 @@ def mock_start_as_current_span(monkeypatch: pytest.MonkeyPatch) -> mock.Mock:
tracer, 'start_as_current_span', mock_start_as_current_span
)
do_replace(telemetry.tracer)
do_replace(tracing.tracer)
do_replace(base_agent.tracer)
return mock_start_as_current_span
@@ -0,0 +1,57 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from google.adk.telemetry.google_cloud import get_gcp_exporters
import pytest
@pytest.mark.parametrize("enable_cloud_tracing", [True, False])
@pytest.mark.parametrize("enable_cloud_metrics", [True, False])
@pytest.mark.parametrize("enable_cloud_logging", [True, False])
def test_get_gcp_exporters(
enable_cloud_tracing: bool,
enable_cloud_metrics: bool,
enable_cloud_logging: bool,
monkeypatch: pytest.MonkeyPatch,
):
"""
Test initializing correct providers in setup_otel
when enabling telemetry via Google O11y.
"""
# Arrange.
# Mocking google.auth.default to improve the test time.
auth_mock = mock.MagicMock()
auth_mock.return_value = ("", "project-id")
monkeypatch.setattr(
"google.auth.default",
auth_mock,
)
# Act.
otel_hooks = get_gcp_exporters(
enable_cloud_tracing=enable_cloud_tracing,
enable_cloud_metrics=enable_cloud_metrics,
enable_cloud_logging=enable_cloud_logging,
)
# Assert.
# If given telemetry type was enabled,
# the corresponding provider should be set.
assert len(otel_hooks.span_processors) == (1 if enable_cloud_tracing else 0)
assert len(otel_hooks.metric_readers) == (1 if enable_cloud_metrics else 0)
assert len(otel_hooks.log_record_processors) == (
1 if enable_cloud_logging else 0
)
+3 -3
View File
@@ -23,9 +23,9 @@ from google.adk.agents.llm_agent import LlmAgent
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.telemetry import trace_call_llm
from google.adk.telemetry import trace_merged_tool_calls
from google.adk.telemetry import trace_tool_call
from google.adk.telemetry.tracing import trace_call_llm
from google.adk.telemetry.tracing import trace_merged_tool_calls
from google.adk.telemetry.tracing import trace_tool_call
from google.adk.tools.base_tool import BaseTool
from google.genai import types
import pytest