fix: Store and retrieve EventCompaction via custom_metadata in Vertex AISessionService

This change enables round-tripping of EventCompaction data by storing it within the event's custom_metadata under the key "_compaction" when appending events. When retrieving events, the "_compaction" data is extracted from custom_metadata and used to populate the EventActions.compaction field. This is a temporary measure until the Vertex AI SDK's SessionEvent model supports a dedicated compaction field.

Close #3465

Co-authored-by: George Weale <gweale@google.com>
PiperOrigin-RevId: 878128265
This commit is contained in:
George Weale
2026-03-03 14:33:02 -08:00
committed by Copybara-Service
parent d61846f6c6
commit 2e434ca7be
2 changed files with 129 additions and 11 deletions
@@ -33,6 +33,7 @@ if TYPE_CHECKING:
from . import _session_util
from ..events.event import Event
from ..events.event_actions import EventActions
from ..events.event_actions import EventCompaction
from ..utils.vertex_ai_utils import get_express_mode_api_key
from .base_session_service import BaseSessionService
from .base_session_service import GetSessionConfig
@@ -267,8 +268,9 @@ class VertexAiSessionService(BaseSessionService):
k: json.loads(v.model_dump_json(exclude_none=True, by_alias=True))
for k, v in event.actions.requested_auth_configs.items()
},
# TODO: add requested_tool_confirmations, compaction, agent_state once
# TODO: add requested_tool_confirmations, agent_state once
# they are available in the API.
# Note: compaction is stored via event_metadata.custom_metadata.
}
if event.error_code:
config['error_code'] = event.error_code
@@ -291,6 +293,19 @@ class VertexAiSessionService(BaseSessionService):
metadata_dict['grounding_metadata'] = event.grounding_metadata.model_dump(
exclude_none=True, mode='json'
)
# Store compaction data in custom_metadata since the Vertex AI service
# does not yet support the compaction field.
# TODO: Stop writing to custom_metadata once the Vertex AI service
# supports the compaction field natively in EventActions.
if event.actions and event.actions.compaction:
compaction_dict = event.actions.compaction.model_dump(
exclude_none=True, mode='json'
)
existing_custom = metadata_dict.get('custom_metadata') or {}
metadata_dict['custom_metadata'] = {
**existing_custom,
'_compaction': compaction_dict,
}
config['event_metadata'] = metadata_dict
async with self._get_api_client() as api_client:
@@ -347,16 +362,6 @@ class VertexAiSessionService(BaseSessionService):
def _from_api_event(api_event_obj: vertexai.types.SessionEvent) -> Event:
"""Converts an API event object to an Event object."""
actions = getattr(api_event_obj, 'actions', None)
if actions:
actions_dict = actions.model_dump(exclude_none=True, mode='python')
rename_map = {'transfer_agent': 'transfer_to_agent'}
renamed_actions_dict = {
rename_map.get(k, k): v for k, v in actions_dict.items()
}
event_actions = EventActions.model_validate(renamed_actions_dict)
else:
event_actions = EventActions()
event_metadata = getattr(api_event_obj, 'event_metadata', None)
if event_metadata:
long_running_tool_ids_list = getattr(
@@ -370,6 +375,16 @@ def _from_api_event(api_event_obj: vertexai.types.SessionEvent) -> Event:
interrupted = getattr(event_metadata, 'interrupted', None)
branch = getattr(event_metadata, 'branch', None)
custom_metadata = getattr(event_metadata, 'custom_metadata', None)
# Extract compaction data stored in custom_metadata.
# NOTE: This read path must be kept permanently because sessions
# written before native compaction support store compaction data
# in custom_metadata under the '_compaction' key.
compaction_data = None
if custom_metadata and '_compaction' in custom_metadata:
custom_metadata = dict(custom_metadata) # avoid mutating the API response
compaction_data = custom_metadata.pop('_compaction')
if not custom_metadata:
custom_metadata = None
grounding_metadata = _session_util.decode_model(
getattr(event_metadata, 'grounding_metadata', None),
types.GroundingMetadata,
@@ -381,8 +396,26 @@ def _from_api_event(api_event_obj: vertexai.types.SessionEvent) -> Event:
interrupted = None
branch = None
custom_metadata = None
compaction_data = None
grounding_metadata = None
if actions:
actions_dict = actions.model_dump(exclude_none=True, mode='python')
rename_map = {'transfer_agent': 'transfer_to_agent'}
renamed_actions_dict = {
rename_map.get(k, k): v for k, v in actions_dict.items()
}
if compaction_data:
renamed_actions_dict['compaction'] = compaction_data
event_actions = EventActions.model_validate(renamed_actions_dict)
else:
if compaction_data:
event_actions = EventActions(
compaction=EventCompaction.model_validate(compaction_data)
)
else:
event_actions = EventActions()
return Event(
id=api_event_obj.name.split('/')[-1],
invocation_id=api_event_obj.invocation_id,
@@ -27,6 +27,7 @@ from google.adk.auth import auth_schemes
from google.adk.auth.auth_tool import AuthConfig
from google.adk.events.event import Event
from google.adk.events.event_actions import EventActions
from google.adk.events.event_actions import EventCompaction
from google.adk.sessions.base_session_service import GetSessionConfig
from google.adk.sessions.session import Session
from google.adk.sessions.vertex_ai_session_service import VertexAiSessionService
@@ -826,3 +827,87 @@ async def test_append_event():
assert len(retrieved_session.events) == 2
event_to_append.id = retrieved_session.events[1].id
assert retrieved_session.events[1] == event_to_append
@pytest.mark.asyncio
@pytest.mark.usefixtures('mock_get_api_client')
async def test_append_event_with_compaction():
"""Compaction data round-trips through append_event and get_session."""
session_service = mock_vertex_ai_session_service()
session = await session_service.get_session(
app_name='123', user_id='user', session_id='1'
)
assert session is not None
compaction = EventCompaction(
start_timestamp=1000.0,
end_timestamp=2000.0,
compacted_content=genai_types.Content(
parts=[genai_types.Part(text='compacted summary')]
),
)
event_to_append = Event(
invocation_id='compaction_invocation',
author='model',
timestamp=1734005534.0,
actions=EventActions(compaction=compaction),
)
await session_service.append_event(session, event_to_append)
retrieved_session = await session_service.get_session(
app_name='123', user_id='user', session_id='1'
)
assert retrieved_session is not None
appended_event = retrieved_session.events[-1]
assert appended_event.actions.compaction is not None
assert appended_event.actions.compaction.start_timestamp == 1000.0
assert appended_event.actions.compaction.end_timestamp == 2000.0
assert appended_event.actions.compaction.compacted_content.parts[0].text == (
'compacted summary'
)
# custom_metadata should remain None when only compaction was stored
assert appended_event.custom_metadata is None
@pytest.mark.asyncio
@pytest.mark.usefixtures('mock_get_api_client')
async def test_append_event_with_compaction_and_custom_metadata():
"""Both compaction and user custom_metadata survive the round-trip."""
session_service = mock_vertex_ai_session_service()
session = await session_service.get_session(
app_name='123', user_id='user', session_id='1'
)
assert session is not None
compaction = EventCompaction(
start_timestamp=100.0,
end_timestamp=200.0,
compacted_content=genai_types.Content(
parts=[genai_types.Part(text='summary')]
),
)
event_to_append = Event(
invocation_id='compaction_and_meta_invocation',
author='model',
timestamp=1734005535.0,
actions=EventActions(compaction=compaction),
custom_metadata={'user_key': 'user_value'},
)
await session_service.append_event(session, event_to_append)
retrieved_session = await session_service.get_session(
app_name='123', user_id='user', session_id='1'
)
assert retrieved_session is not None
appended_event = retrieved_session.events[-1]
# Compaction is restored
assert appended_event.actions.compaction is not None
assert appended_event.actions.compaction.start_timestamp == 100.0
assert appended_event.actions.compaction.end_timestamp == 200.0
# User custom_metadata is preserved without the internal _compaction key
assert appended_event.custom_metadata == {'user_key': 'user_value'}
assert '_compaction' not in (appended_event.custom_metadata or {})