feat: implementation of LLM context compaction

Provide a more efficient way to compact LLM context for better agentic performance.

PiperOrigin-RevId: 815785898
This commit is contained in:
Hangfei Lin
2025-10-06 10:28:12 -07:00
committed by Copybara-Service
parent ca6a4340f4
commit e0dd06ff04
10 changed files with 904 additions and 35 deletions
+22 -2
View File
@@ -47,6 +47,26 @@ class ResumabilityConfig(BaseModel):
"""
@experimental
class EventsCompactionConfig(BaseModel):
"""The config of event compaction for an application."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra="forbid",
)
compactor: BaseEventsCompactor
"""The event compactor strategy for the application."""
compaction_interval: int
"""The number of *new* user-initiated invocations that, once
fully represented in the session's events, will trigger a compaction."""
overlap_size: int
"""The number of preceding invocations to include from the
end of the last compacted range. This creates an overlap between consecutive
compacted summaries, maintaining context."""
@experimental
class App(BaseModel):
"""Represents an LLM-backed agentic application.
@@ -73,8 +93,8 @@ class App(BaseModel):
plugins: list[BasePlugin] = Field(default_factory=list)
"""The plugins in the application."""
event_compactor: Optional[BaseEventsCompactor] = None
"""The event compactor strategy for the application."""
events_compaction_config: Optional[EventsCompactionConfig] = None
"""The config of event compaction for the application."""
context_cache_config: Optional[ContextCacheConfig] = None
"""Context cache configuration that applies to all LLM agents in the app."""
+7 -10
View File
@@ -26,25 +26,22 @@ from ..utils.feature_decorator import experimental
class BaseEventsCompactor(abc.ABC):
"""Base interface for compacting events."""
@abc.abstractmethod
async def maybe_compact_events(
self, *, events: list[Event]
) -> Optional[Content]:
"""A list of uncompacted events, decide whether to compact.
) -> Optional[Event]:
"""Compact a list of events into a single event.
If no need to compact, return None. Otherwise, compact into a content and
If compaction failed, return None. Otherwise, compact into a content and
return it.
This method will summarize the events and return a new summray event
indicating the range of events it summarized.
When sending events to the LLM, if a summary event is present, the events it
replaces (those identified in itssummary_range) should not be included.
This method will summarize the events and return a new summray event
indicating the range of events it summarized.
Args:
events: Events to compact.
agent_name: The name of the agent.
Returns:
The new compacted content, or None if no compaction is needed.
The new compacted event, or None if no compaction happended.
"""
raise NotImplementedError()
+190
View File
@@ -0,0 +1,190 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from google.adk.apps.app import App
from google.adk.sessions.base_session_service import BaseSessionService
from google.adk.sessions.session import Session
logger = logging.getLogger('google_adk.' + __name__)
async def _run_compaction_for_sliding_window(
app: App, session: Session, session_service: BaseSessionService
):
"""Runs compaction for SlidingWindowCompactor.
This method implements the sliding window compaction logic. It determines
if enough new invocations have occurred since the last compaction based on
`compaction_invocation_threshold`. If so, it selects a range of events to
compact based on `overlap_size`, and calls `maybe_compact_events` on the
compactor.
The compaction process is controlled by two parameters:
1. `compaction_invocation_threshold`: The number of *new* user-initiated
invocations that, once fully
represented in the session's events, will trigger a compaction.
2. `overlap_size`: The number of preceding invocations to include from the
end of the last
compacted range. This creates an overlap between consecutive compacted
summaries,
maintaining context.
The compactor is called after an agent has finished processing a turn and all
its events
have been added to the session. It checks if a new compaction is needed.
When a compaction is triggered:
- The compactor identifies the range of `invocation_id`s to be summarized.
- This range starts `overlap_size` invocations before the beginning of the
new block of `compaction_invocation_threshold` invocations and ends
with the last
invocation
in the current block.
- A `CompactedEvent` is created, summarizing all events within this
determined
`invocation_id` range. This `CompactedEvent` is then appended to the
session.
Here is an example with `compaction_invocation_threshold = 2` and
`overlap_size = 1`:
Let's assume events are added for `invocation_id`s 1, 2, 3, and 4 in order.
1. **After `invocation_id` 2 events are added:**
- The session now contains events for invocations 1 and 2. This
fulfills the `compaction_invocation_threshold = 2` criteria.
- Since this is the first compaction, the range starts from the
beginning.
- A `CompactedEvent` is generated, summarizing events within
`invocation_id` range [1, 2].
- The session now contains: `[E(inv=1, role=user), E(inv=1, role=model),
E(inv=2, role=user), E(inv=2, role=model), E(inv=2, role=user),
CompactedEvent(inv=[1, 2])]`.
2. **After `invocation_id` 3 events are added:**
- No compaction happens yet, because only 1 new invocation (`inv=3`)
has been completed since the last compaction, and
`compaction_invocation_threshold` is 2.
3. **After `invocation_id` 4 events are added:**
- The session now contains new events for invocations 3 and 4, again
fulfilling `compaction_invocation_threshold = 2`.
- The last `CompactedEvent` covered up to `invocation_id` 2. With
`overlap_size = 1`, the new compaction range
will start one invocation before the new block (inv 3), which is
`invocation_id` 2.
- The new compaction range is from `invocation_id` 2 to 4.
- A new `CompactedEvent` is generated, summarizing events within
`invocation_id` range [2, 4].
- The session now contains: `[E(inv=1, role=user), E(inv=1, role=model),
E(inv=2, role=user), E(inv=2, role=model), E(inv=2, role=user),
CompactedEvent(inv=[1, 2]), E(inv=3, role=user), E(inv=3, role=model),
E(inv=4, role=user), E(inv=4, role=model), CompactedEvent(inv=[2, 4])]`.
Args:
app: The application instance.
session: The session containing events to compact.
session_service: The session service for appending events.
"""
events = session.events
if not events:
return None
# Find the last compaction event and its range.
last_compacted_end_timestamp = 0.0
for event in reversed(events):
if (
event.actions
and event.actions.compaction
and event.actions.compaction.end_timestamp
):
last_compacted_end_timestamp = event.actions.compaction.end_timestamp
break
# Get unique invocation IDs and their latest timestamps.
invocation_latest_timestamps = {}
for event in events:
# Only consider non-compaction events for unique invocation IDs.
if event.invocation_id and not (event.actions and event.actions.compaction):
invocation_latest_timestamps[event.invocation_id] = max(
invocation_latest_timestamps.get(event.invocation_id, 0.0),
event.timestamp,
)
unique_invocation_ids = list(invocation_latest_timestamps.keys())
# Determine which invocations are new since the last compaction.
new_invocation_ids = [
inv_id
for inv_id in unique_invocation_ids
if invocation_latest_timestamps[inv_id] > last_compacted_end_timestamp
]
if len(new_invocation_ids) < app.events_compaction_config.compaction_interval:
return None # Not enough new invocations to trigger compaction.
# Determine the range of invocations to compact.
# The end of the compaction range is the last of the new invocations.
end_inv_id = new_invocation_ids[-1]
# The start of the compaction range is overlap_size invocations before
# the first of the new invocations.
first_new_inv_id = new_invocation_ids[0]
first_new_inv_idx = unique_invocation_ids.index(first_new_inv_id)
start_idx = max(
0, first_new_inv_idx - app.events_compaction_config.overlap_size
)
start_inv_id = unique_invocation_ids[start_idx]
# Find the index of the last event with end_inv_id.
last_event_idx = -1
for i in range(len(events) - 1, -1, -1):
if events[i].invocation_id == end_inv_id:
last_event_idx = i
break
events_to_compact = []
# Trim events_to_compact to include all events up to and including the
# last event of end_inv_id.
if last_event_idx != -1:
# Find the index of the first event of start_inv_id in events.
first_event_start_inv_idx = -1
for i, event in enumerate(events):
if event.invocation_id == start_inv_id:
first_event_start_inv_idx = i
break
if first_event_start_inv_idx != -1:
events_to_compact = events[first_event_start_inv_idx : last_event_idx + 1]
# Filter out any existing compaction events from the list.
events_to_compact = [
e
for e in events_to_compact
if not (e.actions and e.actions.compaction)
]
if not events_to_compact:
return None
compaction_event = (
await app.events_compaction_config.compactor.maybe_compact_events(
events=events_to_compact
)
)
if compaction_event:
await session_service.append_event(session=session, event=compaction_event)
logger.debug('Event compactor finished.')
@@ -0,0 +1,144 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Optional
from google.genai import types
from google.genai.types import Content
from google.genai.types import Part
from ..events.event import Event
from ..events.event_actions import EventActions
from ..events.event_actions import EventCompaction
from ..models.base_llm import BaseLlm
from ..models.llm_request import LlmRequest
from .base_events_compactor import BaseEventsCompactor
class SlidingWindowCompactor(BaseEventsCompactor):
"""A summarizer for Sliding Window Compaction logic in Runner.
This compactor works with ADK runner to provide sliding window compaction.
The runner uses `compaction_invocation_threshold` and `overlap_size`
configured in `EventsCompactionConfig` on the `App` to determine when to
trigger compaction and which events to compact. This class performs
summarization of events passed by the runner.
The compaction process is controlled by two parameters read by the Runner from
`EventsCompactionConfig`:
1. `compaction_invocation_threshold`: The number of *new* user-initiated
invocations that, once fully
represented in the session's events, will trigger a compaction.
2. `overlap_size`: The number of preceding invocations to include from the
end of the last
compacted range. This creates an overlap between consecutive compacted
summaries,
maintaining context.
When `Runner` determines compaction is needed based on
`compaction_invocation_threshold`,
it selects a range of events based on `overlap_size` and passes them to
`maybe_compact_events` for summarization into a `CompactedEvent`.
This `CompactedEvent` is then appended to the session by the `Runner`.
"""
_DEFAULT_PROMPT_TEMPLATE = (
'The following is a conversation history between a user and an AI'
' agent. Please summarize the conversation, focusing on key'
' information and decisions made, as well as any unresolved'
' questions or tasks. The summary should be concise and capture the'
' essence of the interaction. Each event is prefixed with the'
' author.\\n\\n{conversation_history}'
)
def __init__(
self,
llm: BaseLlm,
prompt_template: Optional[str] = None,
):
"""Initializes the SlidingWindowCompactor.
Args:
llm: The LLM used for summarization.
prompt_template: An optional template string for the summarization
prompt. If not provided, a default template will be used. The template
should contain a '{conversation_history}' placeholder.
"""
self._llm = llm
self._prompt_template = prompt_template or self._DEFAULT_PROMPT_TEMPLATE
def _format_events_for_prompt(self, events: list[Event]) -> str:
"""Formats a list of events into a string for the LLM prompt."""
formatted_history = []
for event in events:
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
formatted_history.append(f'{event.author}: {part.text}')
return '\\n'.join(formatted_history)
async def maybe_compact_events(
self, *, events: list[Event]
) -> Optional[Event]:
"""Compacts given events and returns the compacted content.
Args:
events: A list of events to compact.
Returns:
The new compacted event, or None if no compaction is needed.
"""
if not events:
return None
conversation_history = self._format_events_for_prompt(events)
prompt = self._prompt_template.format(
conversation_history=conversation_history
)
llm_request = LlmRequest(
model=self._llm.model,
contents=[Content(role='user', parts=[Part(text=prompt)])],
)
summary_content = None
async for llm_response in self._llm.generate_content_async(
llm_request, stream=False
):
if llm_response.content:
summary_content = llm_response.content
break
if summary_content is None:
return None
# Ensure the compacted content has the role 'user'
summary_content.role = 'user'
start_timestamp = events[0].timestamp
end_timestamp = events[-1].timestamp
compaction = EventCompaction(
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
compacted_content=summary_content,
)
actions = EventActions(compaction=compaction)
return Event(
author='user',
actions=actions,
invocation_id=Event.new_id(),
)
+80 -16
View File
@@ -222,6 +222,9 @@ def _contains_empty_content(event: Event) -> bool:
Returns:
True if the event should be skipped, False otherwise.
"""
if event.actions and event.actions.compaction:
return False
return (
not event.content
or not event.content.role
@@ -233,17 +236,40 @@ def _contains_empty_content(event: Event) -> bool:
def _get_contents(
current_branch: Optional[str], events: list[Event], agent_name: str = ''
) -> list[types.Content]:
"""Get the contents for the LLM request.
"""Retrieves and processes events into a list of Contents for the LLM request.
Applies filtering, rearrangement, and content processing to events.
This function prepares the conversation history for the LLM by applying
several transformations:
1. **Initial Filtering**: Removes events that are empty, do not belong
to the current invocation branch, or are related to authentication
or request confirmation.
2. **Compaction Handling**: Identifies the latest compaction event. If found,
it replaces all events covered by the compaction range with the compacted
summary content. Only events *after* the compaction's end timestamp
are included in addition to the summary. If no compaction event exists,
all filtered events are considered.
3. **Transcription Aggregation**: Combines consecutive
`input_transcription` and `output_transcription` events into single
'user' and 'model' role `types.Content` events, respectively.
4. **Multi-Agent Presentation**: Reformats messages from other agents
(i.e., not the current `agent_name` and not 'user') to be presented
as user-role context, prefixed with `[agent_name] said:`. Compactor
events are included directly without reformatting.
5. **Function Call/Response Rearrangement**: Ensures proper pairing of
asynchronous function calls and responses within the event history.
6. **Content Conversion**: Converts the final list of processed events
into `types.Content` objects, removing any client-side function call IDs.
Args:
current_branch: The current branch of the agent.
events: Events to process.
agent_name: The name of the agent.
current_branch: The current invocation branch ID. Events outside this branch
will be filtered out.
events: A list of session events to process.
agent_name: The name of the agent currently running. Used to distinguish
between events from the current agent, other agents, and the user.
Returns:
A list of processed contents.
A list of `types.Content` objects representing the conversation history
to be sent to the LLM.
"""
accumulated_input_transcription = ''
accumulated_output_transcription = ''
@@ -266,18 +292,55 @@ def _get_contents(
raw_filtered_events.append(event)
# Find the latest compaction event.
latest_compaction_event = None
for event in reversed(raw_filtered_events):
if event.actions and event.actions.compaction:
latest_compaction_event = event
break
events_to_process = []
if latest_compaction_event:
compaction = latest_compaction_event.actions.compaction
if (
compaction.start_timestamp is not None
and compaction.end_timestamp is not None
):
# Add the compacted event itself.
new_event = Event(
timestamp=compaction.end_timestamp,
author='compactor',
content=compaction.compacted_content,
branch=latest_compaction_event.branch,
invocation_id=latest_compaction_event.invocation_id,
actions=latest_compaction_event.actions,
)
events_to_process.append(new_event)
# Add events from raw_filtered_events that are *after* the
# latest compaction's end timestamp.
for event in raw_filtered_events:
if event.timestamp > compaction.end_timestamp:
events_to_process.append(event)
else:
# No compaction events, process all raw filtered events.
events_to_process = raw_filtered_events
# Sort by timestamp to ensure chronological order.
events_to_process.sort(key=lambda x: x.timestamp)
filtered_events = []
# aggregate transcription events
for i in range(len(raw_filtered_events)):
event = raw_filtered_events[i]
for i in range(len(events_to_process)):
event = events_to_process[i]
if not event.content:
# Convert transcription into normal event
if event.input_transcription and event.input_transcription.text:
accumulated_input_transcription += event.input_transcription.text
if (
i != len(raw_filtered_events) - 1
and raw_filtered_events[i + 1].input_transcription
and raw_filtered_events[i + 1].input_transcription.text
i != len(events_to_process) - 1
and events_to_process[i + 1].input_transcription
and events_to_process[i + 1].input_transcription.text
):
continue
event = event.model_copy(deep=True)
@@ -290,9 +353,9 @@ def _get_contents(
elif event.output_transcription and event.output_transcription.text:
accumulated_output_transcription += event.output_transcription.text
if (
i != len(raw_filtered_events) - 1
and raw_filtered_events[i + 1].output_transcription
and raw_filtered_events[i + 1].output_transcription.text
i != len(events_to_process) - 1
and events_to_process[i + 1].output_transcription
and events_to_process[i + 1].output_transcription.text
):
continue
event = event.model_copy(deep=True)
@@ -321,8 +384,9 @@ def _get_contents(
contents = []
for event in result_events:
content = copy.deepcopy(event.content)
remove_client_function_call_id(content)
contents.append(content)
if content:
remove_client_function_call_id(content)
contents.append(content)
return contents
+34
View File
@@ -25,6 +25,8 @@ from typing import List
from typing import Optional
import warnings
from google.adk.apps.compaction import _run_compaction_for_sliding_window
from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor
from google.genai import types
from .agents.active_streaming_tool import ActiveStreamingTool
@@ -134,6 +136,7 @@ class Runner:
ValueError: If `app` is provided along with `app_name` or `plugins`, or
if `app` is not provided but either `app_name` or `agent` is missing.
"""
self.app = app
(
self.app_name,
self.agent,
@@ -360,11 +363,42 @@ class Runner:
) as agen:
async for event in agen:
yield event
# Run compaction after all events are yielded from the agent.
# (We don't compact in the middle of an invocation, we only compact at the end of an invocation.)
if self.app and self.app.events_compaction_config:
logger.info('Running event compactor.')
# Run compaction in a separate task to avoid blocking the main thread.
# So the users can still finish the event loop from the agent while the
# compaction is running.
asyncio.create_task(
_run_compaction_for_sliding_window(
self.app, session, self.session_service
)
)
async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
async for event in agen:
yield event
async def _run_compaction_default(self, session: Session):
"""Runs compaction for other types of compactors.
This method calls `maybe_compact_events` on the compactor with all
events in the session.
Args:
session: The session containing events to compact.
"""
compaction_event = (
await self.app.events_compaction_config.compactor.maybe_compact_events(
events=session.events
)
)
if compaction_event:
await self.session_service.append_event(
session=session, event=compaction_event
)
def _should_append_event(self, event: Event, is_live_call: bool) -> bool:
"""Checks if an event should be appended to the session."""
# Don't append audio response from model in live mode to session.
+219
View File
@@ -0,0 +1,219 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from unittest.mock import AsyncMock
from unittest.mock import Mock
from google.adk.agents.base_agent import BaseAgent
from google.adk.apps.app import App
from google.adk.apps.app import EventsCompactionConfig
from google.adk.apps.compaction import _run_compaction_for_sliding_window
from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor
from google.adk.events.event import Event
from google.adk.events.event_actions import EventActions
from google.adk.events.event_actions import EventCompaction
from google.adk.sessions.base_session_service import BaseSessionService
from google.adk.sessions.session import Session
from google.genai.types import Content
from google.genai.types import Part
import pytest
@pytest.mark.parametrize(
'env_variables', ['GOOGLE_AI', 'VERTEX'], indirect=True
)
class TestCompaction(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.mock_session_service = AsyncMock(spec=BaseSessionService)
self.mock_compactor = AsyncMock(spec=SlidingWindowCompactor)
def _create_event(
self, timestamp: float, invocation_id: str, text: str
) -> Event:
return Event(
timestamp=timestamp,
invocation_id=invocation_id,
author='user',
content=Content(parts=[Part(text=text)]),
)
def _create_compacted_event(
self, start_ts: float, end_ts: float, summary_text: str
) -> Event:
compaction = EventCompaction(
start_timestamp=start_ts,
end_timestamp=end_ts,
compacted_content=Content(parts=[Part(text=summary_text)]),
)
return Event(
timestamp=end_ts,
author='compactor',
content=compaction.compacted_content,
actions=EventActions(compaction=compaction),
invocation_id=Event.new_id(),
)
async def test_run_compaction_for_sliding_window_no_events(self):
app = App(name='test', root_agent=Mock(spec=BaseAgent))
session = Session(app_name='test', user_id='u1', id='s1', events=[])
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
self.mock_compactor.maybe_compact_events.assert_not_called()
self.mock_session_service.append_event.assert_not_called()
async def test_run_compaction_for_sliding_window_not_enough_new_invocations(
self,
):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
compactor=self.mock_compactor,
compaction_interval=3,
overlap_size=1,
),
)
# Only two new invocations ('inv1', 'inv2'), less than compaction_interval=3.
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=[
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2'),
],
)
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
self.mock_compactor.maybe_compact_events.assert_not_called()
self.mock_session_service.append_event.assert_not_called()
async def test_run_compaction_for_sliding_window_first_compaction(self):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
compactor=self.mock_compactor,
compaction_interval=2,
overlap_size=1,
),
)
events = [
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2'),
self._create_event(3.0, 'inv3', 'e3'),
self._create_event(4.0, 'inv4', 'e4'),
]
session = Session(app_name='test', user_id='u1', id='s1', events=events)
mock_compacted_event = self._create_compacted_event(
1.0, 4.0, 'Summary inv1-inv4'
)
self.mock_compactor.maybe_compact_events.return_value = mock_compacted_event
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
# Expected events to compact: inv1, inv2, inv3, inv4
compacted_events_arg = self.mock_compactor.maybe_compact_events.call_args[
1
]['events']
self.assertEqual(
[e.invocation_id for e in compacted_events_arg],
['inv1', 'inv2', 'inv3', 'inv4'],
)
self.mock_session_service.append_event.assert_called_once_with(
session=session, event=mock_compacted_event
)
async def test_run_compaction_for_sliding_window_with_overlap(self):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
compactor=self.mock_compactor,
compaction_interval=2,
overlap_size=1,
),
)
# inv1-inv2 are already compacted. Last compacted end timestamp is 2.0.
initial_events = [
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2'),
self._create_compacted_event(1.0, 2.0, 'Summary inv1-inv2'),
]
# Add new invocations inv3, inv4, inv5
new_events = [
self._create_event(3.0, 'inv3', 'e3'),
self._create_event(4.0, 'inv4', 'e4'),
self._create_event(5.0, 'inv5', 'e5'),
]
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=initial_events + new_events,
)
mock_compacted_event = self._create_compacted_event(
2.0, 5.0, 'Summary inv2-inv5'
)
self.mock_compactor.maybe_compact_events.return_value = mock_compacted_event
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
# New invocations are inv3, inv4, inv5 (3 new) > threshold (2).
# Overlap size is 1, so start from 1 inv before inv3, which is inv2.
# Compact range: inv2 to inv5.
compacted_events_arg = self.mock_compactor.maybe_compact_events.call_args[
1
]['events']
self.assertEqual(
[e.invocation_id for e in compacted_events_arg],
['inv2', 'inv3', 'inv4', 'inv5'],
)
self.mock_session_service.append_event.assert_called_once_with(
session=session, event=mock_compacted_event
)
async def test_run_compaction_for_sliding_window_no_compaction_event_returned(
self,
):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
compactor=self.mock_compactor,
compaction_interval=1,
overlap_size=0,
),
)
events = [self._create_event(1.0, 'inv1', 'e1')]
session = Session(app_name='test', user_id='u1', id='s1', events=events)
self.mock_compactor.maybe_compact_events.return_value = None
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
self.mock_compactor.maybe_compact_events.assert_called_once()
self.mock_session_service.append_event.assert_not_called()
@@ -0,0 +1,162 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from unittest.mock import AsyncMock
from unittest.mock import Mock
from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor
from google.adk.events.event import Event
from google.adk.events.event_actions import EventActions
from google.adk.events.event_actions import EventCompaction
from google.adk.models.base_llm import BaseLlm
from google.adk.models.llm_request import LlmRequest
from google.genai.types import Content
from google.genai.types import FunctionCall
from google.genai.types import FunctionResponse
from google.genai.types import Part
import pytest
@pytest.mark.parametrize(
'env_variables', ['GOOGLE_AI', 'VERTEX'], indirect=True
)
class TestSlidingWindowCompactor(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.mock_llm = AsyncMock(spec=BaseLlm)
self.mock_llm.model = 'test-model'
self.compactor = SlidingWindowCompactor(llm=self.mock_llm)
def _create_event(
self, timestamp: float, text: str, author: str = 'user'
) -> Event:
return Event(
timestamp=timestamp,
author=author,
content=Content(parts=[Part(text=text)]),
)
async def test_maybe_compact_events_success(self):
events = [
self._create_event(1.0, 'Hello', 'user'),
self._create_event(2.0, 'Hi there!', 'model'),
]
expected_conversation_history = 'user: Hello\\nmodel: Hi there!'
expected_prompt = self.compactor._DEFAULT_PROMPT_TEMPLATE.format(
conversation_history=expected_conversation_history
)
mock_llm_response = Mock(content=Content(parts=[Part(text='Summary')]))
async def async_gen():
yield mock_llm_response
self.mock_llm.generate_content_async.return_value = async_gen()
compacted_event = await self.compactor.maybe_compact_events(events=events)
self.assertIsNotNone(compacted_event)
self.assertEqual(
compacted_event.actions.compaction.compacted_content.parts[0].text,
'Summary',
)
self.assertEqual(compacted_event.author, 'user')
self.assertIsNotNone(compacted_event.actions)
self.assertIsNotNone(compacted_event.actions.compaction)
self.assertEqual(compacted_event.actions.compaction.start_timestamp, 1.0)
self.assertEqual(compacted_event.actions.compaction.end_timestamp, 2.0)
self.assertEqual(
compacted_event.actions.compaction.compacted_content.parts[0].text,
'Summary',
)
self.mock_llm.generate_content_async.assert_called_once()
args, kwargs = self.mock_llm.generate_content_async.call_args
llm_request = args[0]
self.assertIsInstance(llm_request, LlmRequest)
self.assertEqual(llm_request.model, 'test-model')
self.assertEqual(llm_request.contents[0].role, 'user')
self.assertEqual(llm_request.contents[0].parts[0].text, expected_prompt)
self.assertFalse(kwargs['stream'])
async def test_maybe_compact_events_empty_llm_response(self):
events = [
self._create_event(1.0, 'Hello', 'user'),
]
mock_llm_response = Mock(content=None)
async def async_gen():
yield mock_llm_response
self.mock_llm.generate_content_async.return_value = async_gen()
compacted_event = await self.compactor.maybe_compact_events(events=events)
self.assertIsNone(compacted_event)
async def test_maybe_compact_events_empty_input(self):
compacted_event = await self.compactor.maybe_compact_events(events=[])
self.assertIsNone(compacted_event)
self.mock_llm.generate_content_async.assert_not_called()
def test_format_events_for_prompt(self):
events = [
self._create_event(1.0, 'User says...', 'user'),
self._create_event(2.0, 'Model replies...', 'model'),
self._create_event(3.0, 'Another user input', 'user'),
self._create_event(4.0, 'More model text', 'model'),
# Event with no content
Event(timestamp=5.0, author='user'),
# Event with empty content part
Event(
timestamp=6.0,
author='model',
content=Content(parts=[Part(text='')]),
),
# Event with function call
Event(
timestamp=7.0,
author='model',
content=Content(
parts=[
Part(
function_call=FunctionCall(
id='call_1', name='tool', args={}
)
)
]
),
),
# Event with function response
Event(
timestamp=8.0,
author='model',
content=Content(
parts=[
Part(
function_response=FunctionResponse(
id='call_1',
name='tool',
response={'result': 'done'},
)
)
]
),
),
]
expected_formatted_history = (
'user: User says...\\nmodel: Model replies...\\nuser: Another user'
' input\\nmodel: More model text'
)
formatted_history = self.compactor._format_events_for_prompt(events)
self.assertEqual(formatted_history, expected_formatted_history)
+30 -1
View File
@@ -38,7 +38,7 @@ ENV_SETUPS = {
}
@fixture(autouse=True)
@fixture
def env_variables(request: FixtureRequest):
# Set up the environment
env_name: str = request.param
@@ -56,6 +56,35 @@ def env_variables(request: FixtureRequest):
os.environ[key] = original_val
# Store original environment variables to restore later
_original_env = {}
@hookimpl(tryfirst=True)
def pytest_sessionstart(session):
"""Set up environment variables at the beginning of the test session."""
if not ENV_SETUPS:
return
# Use the first env setup to initialize environment for module-level imports
env_name = next(iter(ENV_SETUPS.keys()))
envs = ENV_SETUPS[env_name]
global _original_env
_original_env = {key: os.environ.get(key) for key in envs}
os.environ.update(envs)
@hookimpl(trylast=True)
def pytest_sessionfinish(session):
"""Restore original environment variables at the end of the test session."""
global _original_env
for key, original_val in _original_env.items():
if original_val is None:
os.environ.pop(key, None)
else:
os.environ[key] = original_val
_original_env = {}
@hookimpl(tryfirst=True)
def pytest_generate_tests(metafunc: Metafunc):
"""Generate test cases for each environment setup."""
+16 -6
View File
@@ -236,7 +236,7 @@ def test_update_artifacts():
],
indirect=True,
)
def test_custom_schema():
def test_custom_schema(env_variables):
class CustomInput(BaseModel):
custom_input: str
@@ -289,7 +289,9 @@ def test_custom_schema():
],
indirect=True,
)
def test_agent_tool_response_schema_no_output_schema_vertex_ai():
def test_agent_tool_response_schema_no_output_schema_vertex_ai(
env_variables,
):
"""Test AgentTool with no output schema has string response schema for VERTEX_AI."""
tool_agent = Agent(
name='tool_agent',
@@ -314,7 +316,9 @@ def test_agent_tool_response_schema_no_output_schema_vertex_ai():
],
indirect=True,
)
def test_agent_tool_response_schema_with_output_schema_vertex_ai():
def test_agent_tool_response_schema_with_output_schema_vertex_ai(
env_variables,
):
"""Test AgentTool with output schema has object response schema for VERTEX_AI."""
class CustomOutput(BaseModel):
@@ -342,7 +346,9 @@ def test_agent_tool_response_schema_with_output_schema_vertex_ai():
],
indirect=True,
)
def test_agent_tool_response_schema_gemini_api():
def test_agent_tool_response_schema_gemini_api(
env_variables,
):
"""Test AgentTool with GEMINI_API variant has no response schema."""
class CustomOutput(BaseModel):
@@ -369,7 +375,9 @@ def test_agent_tool_response_schema_gemini_api():
],
indirect=True,
)
def test_agent_tool_response_schema_with_input_schema_vertex_ai():
def test_agent_tool_response_schema_with_input_schema_vertex_ai(
env_variables,
):
"""Test AgentTool with input and output schemas for VERTEX_AI."""
class CustomInput(BaseModel):
@@ -403,7 +411,9 @@ def test_agent_tool_response_schema_with_input_schema_vertex_ai():
],
indirect=True,
)
def test_agent_tool_response_schema_with_input_schema_no_output_vertex_ai():
def test_agent_tool_response_schema_with_input_schema_no_output_vertex_ai(
env_variables,
):
"""Test AgentTool with input schema but no output schema for VERTEX_AI."""
class CustomInput(BaseModel):