diff --git a/src/google/adk/apps/app.py b/src/google/adk/apps/app.py index d1596c85..50faab62 100644 --- a/src/google/adk/apps/app.py +++ b/src/google/adk/apps/app.py @@ -47,6 +47,26 @@ class ResumabilityConfig(BaseModel): """ +@experimental +class EventsCompactionConfig(BaseModel): + """The config of event compaction for an application.""" + + model_config = ConfigDict( + arbitrary_types_allowed=True, + extra="forbid", + ) + + compactor: BaseEventsCompactor + """The event compactor strategy for the application.""" + compaction_interval: int + """The number of *new* user-initiated invocations that, once + fully represented in the session's events, will trigger a compaction.""" + overlap_size: int + """The number of preceding invocations to include from the + end of the last compacted range. This creates an overlap between consecutive + compacted summaries, maintaining context.""" + + @experimental class App(BaseModel): """Represents an LLM-backed agentic application. @@ -73,8 +93,8 @@ class App(BaseModel): plugins: list[BasePlugin] = Field(default_factory=list) """The plugins in the application.""" - event_compactor: Optional[BaseEventsCompactor] = None - """The event compactor strategy for the application.""" + events_compaction_config: Optional[EventsCompactionConfig] = None + """The config of event compaction for the application.""" context_cache_config: Optional[ContextCacheConfig] = None """Context cache configuration that applies to all LLM agents in the app.""" diff --git a/src/google/adk/apps/base_events_compactor.py b/src/google/adk/apps/base_events_compactor.py index 97e6611f..dcb875c0 100644 --- a/src/google/adk/apps/base_events_compactor.py +++ b/src/google/adk/apps/base_events_compactor.py @@ -26,25 +26,22 @@ from ..utils.feature_decorator import experimental class BaseEventsCompactor(abc.ABC): """Base interface for compacting events.""" + @abc.abstractmethod async def maybe_compact_events( self, *, events: list[Event] - ) -> Optional[Content]: - """A list of uncompacted events, decide whether to compact. + ) -> Optional[Event]: + """Compact a list of events into a single event. - If no need to compact, return None. Otherwise, compact into a content and + If compaction failed, return None. Otherwise, compact into a content and return it. - This method will summarize the events and return a new summray event - indicating the range of events it summarized. - - When sending events to the LLM, if a summary event is present, the events it - replaces (those identified in itssummary_range) should not be included. + This method will summarize the events and return a new summray event + indicating the range of events it summarized. Args: events: Events to compact. - agent_name: The name of the agent. Returns: - The new compacted content, or None if no compaction is needed. + The new compacted event, or None if no compaction happended. """ raise NotImplementedError() diff --git a/src/google/adk/apps/compaction.py b/src/google/adk/apps/compaction.py new file mode 100644 index 00000000..83ddecfb --- /dev/null +++ b/src/google/adk/apps/compaction.py @@ -0,0 +1,190 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging + +from google.adk.apps.app import App +from google.adk.sessions.base_session_service import BaseSessionService +from google.adk.sessions.session import Session + +logger = logging.getLogger('google_adk.' + __name__) + + +async def _run_compaction_for_sliding_window( + app: App, session: Session, session_service: BaseSessionService +): + """Runs compaction for SlidingWindowCompactor. + + This method implements the sliding window compaction logic. It determines + if enough new invocations have occurred since the last compaction based on + `compaction_invocation_threshold`. If so, it selects a range of events to + compact based on `overlap_size`, and calls `maybe_compact_events` on the + compactor. + + The compaction process is controlled by two parameters: + 1. `compaction_invocation_threshold`: The number of *new* user-initiated + invocations that, once fully + represented in the session's events, will trigger a compaction. + 2. `overlap_size`: The number of preceding invocations to include from the + end of the last + compacted range. This creates an overlap between consecutive compacted + summaries, + maintaining context. + + The compactor is called after an agent has finished processing a turn and all + its events + have been added to the session. It checks if a new compaction is needed. + + When a compaction is triggered: + - The compactor identifies the range of `invocation_id`s to be summarized. + - This range starts `overlap_size` invocations before the beginning of the + new block of `compaction_invocation_threshold` invocations and ends + with the last + invocation + in the current block. + - A `CompactedEvent` is created, summarizing all events within this + determined + `invocation_id` range. This `CompactedEvent` is then appended to the + session. + + Here is an example with `compaction_invocation_threshold = 2` and + `overlap_size = 1`: + Let's assume events are added for `invocation_id`s 1, 2, 3, and 4 in order. + + 1. **After `invocation_id` 2 events are added:** + - The session now contains events for invocations 1 and 2. This + fulfills the `compaction_invocation_threshold = 2` criteria. + - Since this is the first compaction, the range starts from the + beginning. + - A `CompactedEvent` is generated, summarizing events within + `invocation_id` range [1, 2]. + - The session now contains: `[E(inv=1, role=user), E(inv=1, role=model), + E(inv=2, role=user), E(inv=2, role=model), E(inv=2, role=user), + CompactedEvent(inv=[1, 2])]`. + + 2. **After `invocation_id` 3 events are added:** + - No compaction happens yet, because only 1 new invocation (`inv=3`) + has been completed since the last compaction, and + `compaction_invocation_threshold` is 2. + + 3. **After `invocation_id` 4 events are added:** + - The session now contains new events for invocations 3 and 4, again + fulfilling `compaction_invocation_threshold = 2`. + - The last `CompactedEvent` covered up to `invocation_id` 2. With + `overlap_size = 1`, the new compaction range + will start one invocation before the new block (inv 3), which is + `invocation_id` 2. + - The new compaction range is from `invocation_id` 2 to 4. + - A new `CompactedEvent` is generated, summarizing events within + `invocation_id` range [2, 4]. + - The session now contains: `[E(inv=1, role=user), E(inv=1, role=model), + E(inv=2, role=user), E(inv=2, role=model), E(inv=2, role=user), + CompactedEvent(inv=[1, 2]), E(inv=3, role=user), E(inv=3, role=model), + E(inv=4, role=user), E(inv=4, role=model), CompactedEvent(inv=[2, 4])]`. + + + Args: + app: The application instance. + session: The session containing events to compact. + session_service: The session service for appending events. + """ + events = session.events + if not events: + return None + # Find the last compaction event and its range. + last_compacted_end_timestamp = 0.0 + for event in reversed(events): + if ( + event.actions + and event.actions.compaction + and event.actions.compaction.end_timestamp + ): + last_compacted_end_timestamp = event.actions.compaction.end_timestamp + break + + # Get unique invocation IDs and their latest timestamps. + invocation_latest_timestamps = {} + for event in events: + # Only consider non-compaction events for unique invocation IDs. + if event.invocation_id and not (event.actions and event.actions.compaction): + invocation_latest_timestamps[event.invocation_id] = max( + invocation_latest_timestamps.get(event.invocation_id, 0.0), + event.timestamp, + ) + + unique_invocation_ids = list(invocation_latest_timestamps.keys()) + + # Determine which invocations are new since the last compaction. + new_invocation_ids = [ + inv_id + for inv_id in unique_invocation_ids + if invocation_latest_timestamps[inv_id] > last_compacted_end_timestamp + ] + + if len(new_invocation_ids) < app.events_compaction_config.compaction_interval: + return None # Not enough new invocations to trigger compaction. + + # Determine the range of invocations to compact. + # The end of the compaction range is the last of the new invocations. + end_inv_id = new_invocation_ids[-1] + + # The start of the compaction range is overlap_size invocations before + # the first of the new invocations. + first_new_inv_id = new_invocation_ids[0] + first_new_inv_idx = unique_invocation_ids.index(first_new_inv_id) + + start_idx = max( + 0, first_new_inv_idx - app.events_compaction_config.overlap_size + ) + start_inv_id = unique_invocation_ids[start_idx] + + # Find the index of the last event with end_inv_id. + last_event_idx = -1 + for i in range(len(events) - 1, -1, -1): + if events[i].invocation_id == end_inv_id: + last_event_idx = i + break + + events_to_compact = [] + # Trim events_to_compact to include all events up to and including the + # last event of end_inv_id. + if last_event_idx != -1: + # Find the index of the first event of start_inv_id in events. + first_event_start_inv_idx = -1 + for i, event in enumerate(events): + if event.invocation_id == start_inv_id: + first_event_start_inv_idx = i + break + if first_event_start_inv_idx != -1: + events_to_compact = events[first_event_start_inv_idx : last_event_idx + 1] + # Filter out any existing compaction events from the list. + events_to_compact = [ + e + for e in events_to_compact + if not (e.actions and e.actions.compaction) + ] + + if not events_to_compact: + return None + + compaction_event = ( + await app.events_compaction_config.compactor.maybe_compact_events( + events=events_to_compact + ) + ) + if compaction_event: + await session_service.append_event(session=session, event=compaction_event) + logger.debug('Event compactor finished.') diff --git a/src/google/adk/apps/sliding_window_compactor.py b/src/google/adk/apps/sliding_window_compactor.py new file mode 100644 index 00000000..2385d9c1 --- /dev/null +++ b/src/google/adk/apps/sliding_window_compactor.py @@ -0,0 +1,144 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Optional + +from google.genai import types +from google.genai.types import Content +from google.genai.types import Part + +from ..events.event import Event +from ..events.event_actions import EventActions +from ..events.event_actions import EventCompaction +from ..models.base_llm import BaseLlm +from ..models.llm_request import LlmRequest +from .base_events_compactor import BaseEventsCompactor + + +class SlidingWindowCompactor(BaseEventsCompactor): + """A summarizer for Sliding Window Compaction logic in Runner. + + This compactor works with ADK runner to provide sliding window compaction. + The runner uses `compaction_invocation_threshold` and `overlap_size` + configured in `EventsCompactionConfig` on the `App` to determine when to + trigger compaction and which events to compact. This class performs + summarization of events passed by the runner. + + The compaction process is controlled by two parameters read by the Runner from + `EventsCompactionConfig`: + 1. `compaction_invocation_threshold`: The number of *new* user-initiated + invocations that, once fully + represented in the session's events, will trigger a compaction. + 2. `overlap_size`: The number of preceding invocations to include from the + end of the last + compacted range. This creates an overlap between consecutive compacted + summaries, + maintaining context. + + When `Runner` determines compaction is needed based on + `compaction_invocation_threshold`, + it selects a range of events based on `overlap_size` and passes them to + `maybe_compact_events` for summarization into a `CompactedEvent`. + This `CompactedEvent` is then appended to the session by the `Runner`. + """ + + _DEFAULT_PROMPT_TEMPLATE = ( + 'The following is a conversation history between a user and an AI' + ' agent. Please summarize the conversation, focusing on key' + ' information and decisions made, as well as any unresolved' + ' questions or tasks. The summary should be concise and capture the' + ' essence of the interaction. Each event is prefixed with the' + ' author.\\n\\n{conversation_history}' + ) + + def __init__( + self, + llm: BaseLlm, + prompt_template: Optional[str] = None, + ): + """Initializes the SlidingWindowCompactor. + + Args: + llm: The LLM used for summarization. + prompt_template: An optional template string for the summarization + prompt. If not provided, a default template will be used. The template + should contain a '{conversation_history}' placeholder. + """ + self._llm = llm + self._prompt_template = prompt_template or self._DEFAULT_PROMPT_TEMPLATE + + def _format_events_for_prompt(self, events: list[Event]) -> str: + """Formats a list of events into a string for the LLM prompt.""" + formatted_history = [] + for event in events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.text: + formatted_history.append(f'{event.author}: {part.text}') + return '\\n'.join(formatted_history) + + async def maybe_compact_events( + self, *, events: list[Event] + ) -> Optional[Event]: + """Compacts given events and returns the compacted content. + + Args: + events: A list of events to compact. + + Returns: + The new compacted event, or None if no compaction is needed. + """ + if not events: + return None + + conversation_history = self._format_events_for_prompt(events) + prompt = self._prompt_template.format( + conversation_history=conversation_history + ) + + llm_request = LlmRequest( + model=self._llm.model, + contents=[Content(role='user', parts=[Part(text=prompt)])], + ) + summary_content = None + async for llm_response in self._llm.generate_content_async( + llm_request, stream=False + ): + if llm_response.content: + summary_content = llm_response.content + break + + if summary_content is None: + return None + + # Ensure the compacted content has the role 'user' + summary_content.role = 'user' + + start_timestamp = events[0].timestamp + end_timestamp = events[-1].timestamp + + compaction = EventCompaction( + start_timestamp=start_timestamp, + end_timestamp=end_timestamp, + compacted_content=summary_content, + ) + + actions = EventActions(compaction=compaction) + + return Event( + author='user', + actions=actions, + invocation_id=Event.new_id(), + ) diff --git a/src/google/adk/flows/llm_flows/contents.py b/src/google/adk/flows/llm_flows/contents.py index 974daac0..9ff7ee45 100644 --- a/src/google/adk/flows/llm_flows/contents.py +++ b/src/google/adk/flows/llm_flows/contents.py @@ -222,6 +222,9 @@ def _contains_empty_content(event: Event) -> bool: Returns: True if the event should be skipped, False otherwise. """ + if event.actions and event.actions.compaction: + return False + return ( not event.content or not event.content.role @@ -233,17 +236,40 @@ def _contains_empty_content(event: Event) -> bool: def _get_contents( current_branch: Optional[str], events: list[Event], agent_name: str = '' ) -> list[types.Content]: - """Get the contents for the LLM request. + """Retrieves and processes events into a list of Contents for the LLM request. - Applies filtering, rearrangement, and content processing to events. + This function prepares the conversation history for the LLM by applying + several transformations: + 1. **Initial Filtering**: Removes events that are empty, do not belong + to the current invocation branch, or are related to authentication + or request confirmation. + 2. **Compaction Handling**: Identifies the latest compaction event. If found, + it replaces all events covered by the compaction range with the compacted + summary content. Only events *after* the compaction's end timestamp + are included in addition to the summary. If no compaction event exists, + all filtered events are considered. + 3. **Transcription Aggregation**: Combines consecutive + `input_transcription` and `output_transcription` events into single + 'user' and 'model' role `types.Content` events, respectively. + 4. **Multi-Agent Presentation**: Reformats messages from other agents + (i.e., not the current `agent_name` and not 'user') to be presented + as user-role context, prefixed with `[agent_name] said:`. Compactor + events are included directly without reformatting. + 5. **Function Call/Response Rearrangement**: Ensures proper pairing of + asynchronous function calls and responses within the event history. + 6. **Content Conversion**: Converts the final list of processed events + into `types.Content` objects, removing any client-side function call IDs. Args: - current_branch: The current branch of the agent. - events: Events to process. - agent_name: The name of the agent. + current_branch: The current invocation branch ID. Events outside this branch + will be filtered out. + events: A list of session events to process. + agent_name: The name of the agent currently running. Used to distinguish + between events from the current agent, other agents, and the user. Returns: - A list of processed contents. + A list of `types.Content` objects representing the conversation history + to be sent to the LLM. """ accumulated_input_transcription = '' accumulated_output_transcription = '' @@ -266,18 +292,55 @@ def _get_contents( raw_filtered_events.append(event) + # Find the latest compaction event. + latest_compaction_event = None + for event in reversed(raw_filtered_events): + if event.actions and event.actions.compaction: + latest_compaction_event = event + break + + events_to_process = [] + if latest_compaction_event: + compaction = latest_compaction_event.actions.compaction + if ( + compaction.start_timestamp is not None + and compaction.end_timestamp is not None + ): + # Add the compacted event itself. + new_event = Event( + timestamp=compaction.end_timestamp, + author='compactor', + content=compaction.compacted_content, + branch=latest_compaction_event.branch, + invocation_id=latest_compaction_event.invocation_id, + actions=latest_compaction_event.actions, + ) + events_to_process.append(new_event) + + # Add events from raw_filtered_events that are *after* the + # latest compaction's end timestamp. + for event in raw_filtered_events: + if event.timestamp > compaction.end_timestamp: + events_to_process.append(event) + else: + # No compaction events, process all raw filtered events. + events_to_process = raw_filtered_events + + # Sort by timestamp to ensure chronological order. + events_to_process.sort(key=lambda x: x.timestamp) + filtered_events = [] # aggregate transcription events - for i in range(len(raw_filtered_events)): - event = raw_filtered_events[i] + for i in range(len(events_to_process)): + event = events_to_process[i] if not event.content: # Convert transcription into normal event if event.input_transcription and event.input_transcription.text: accumulated_input_transcription += event.input_transcription.text if ( - i != len(raw_filtered_events) - 1 - and raw_filtered_events[i + 1].input_transcription - and raw_filtered_events[i + 1].input_transcription.text + i != len(events_to_process) - 1 + and events_to_process[i + 1].input_transcription + and events_to_process[i + 1].input_transcription.text ): continue event = event.model_copy(deep=True) @@ -290,9 +353,9 @@ def _get_contents( elif event.output_transcription and event.output_transcription.text: accumulated_output_transcription += event.output_transcription.text if ( - i != len(raw_filtered_events) - 1 - and raw_filtered_events[i + 1].output_transcription - and raw_filtered_events[i + 1].output_transcription.text + i != len(events_to_process) - 1 + and events_to_process[i + 1].output_transcription + and events_to_process[i + 1].output_transcription.text ): continue event = event.model_copy(deep=True) @@ -321,8 +384,9 @@ def _get_contents( contents = [] for event in result_events: content = copy.deepcopy(event.content) - remove_client_function_call_id(content) - contents.append(content) + if content: + remove_client_function_call_id(content) + contents.append(content) return contents diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index e7066a0c..8dd1e42b 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -25,6 +25,8 @@ from typing import List from typing import Optional import warnings +from google.adk.apps.compaction import _run_compaction_for_sliding_window +from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor from google.genai import types from .agents.active_streaming_tool import ActiveStreamingTool @@ -134,6 +136,7 @@ class Runner: ValueError: If `app` is provided along with `app_name` or `plugins`, or if `app` is not provided but either `app_name` or `agent` is missing. """ + self.app = app ( self.app_name, self.agent, @@ -360,11 +363,42 @@ class Runner: ) as agen: async for event in agen: yield event + # Run compaction after all events are yielded from the agent. + # (We don't compact in the middle of an invocation, we only compact at the end of an invocation.) + if self.app and self.app.events_compaction_config: + logger.info('Running event compactor.') + # Run compaction in a separate task to avoid blocking the main thread. + # So the users can still finish the event loop from the agent while the + # compaction is running. + asyncio.create_task( + _run_compaction_for_sliding_window( + self.app, session, self.session_service + ) + ) async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen: async for event in agen: yield event + async def _run_compaction_default(self, session: Session): + """Runs compaction for other types of compactors. + + This method calls `maybe_compact_events` on the compactor with all + events in the session. + + Args: + session: The session containing events to compact. + """ + compaction_event = ( + await self.app.events_compaction_config.compactor.maybe_compact_events( + events=session.events + ) + ) + if compaction_event: + await self.session_service.append_event( + session=session, event=compaction_event + ) + def _should_append_event(self, event: Event, is_live_call: bool) -> bool: """Checks if an event should be appended to the session.""" # Don't append audio response from model in live mode to session. diff --git a/tests/unittests/apps/test_compaction.py b/tests/unittests/apps/test_compaction.py new file mode 100644 index 00000000..316290b0 --- /dev/null +++ b/tests/unittests/apps/test_compaction.py @@ -0,0 +1,219 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import AsyncMock +from unittest.mock import Mock + +from google.adk.agents.base_agent import BaseAgent +from google.adk.apps.app import App +from google.adk.apps.app import EventsCompactionConfig +from google.adk.apps.compaction import _run_compaction_for_sliding_window +from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor +from google.adk.events.event import Event +from google.adk.events.event_actions import EventActions +from google.adk.events.event_actions import EventCompaction +from google.adk.sessions.base_session_service import BaseSessionService +from google.adk.sessions.session import Session +from google.genai.types import Content +from google.genai.types import Part +import pytest + + +@pytest.mark.parametrize( + 'env_variables', ['GOOGLE_AI', 'VERTEX'], indirect=True +) +class TestCompaction(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + self.mock_session_service = AsyncMock(spec=BaseSessionService) + self.mock_compactor = AsyncMock(spec=SlidingWindowCompactor) + + def _create_event( + self, timestamp: float, invocation_id: str, text: str + ) -> Event: + return Event( + timestamp=timestamp, + invocation_id=invocation_id, + author='user', + content=Content(parts=[Part(text=text)]), + ) + + def _create_compacted_event( + self, start_ts: float, end_ts: float, summary_text: str + ) -> Event: + compaction = EventCompaction( + start_timestamp=start_ts, + end_timestamp=end_ts, + compacted_content=Content(parts=[Part(text=summary_text)]), + ) + return Event( + timestamp=end_ts, + author='compactor', + content=compaction.compacted_content, + actions=EventActions(compaction=compaction), + invocation_id=Event.new_id(), + ) + + async def test_run_compaction_for_sliding_window_no_events(self): + app = App(name='test', root_agent=Mock(spec=BaseAgent)) + session = Session(app_name='test', user_id='u1', id='s1', events=[]) + await _run_compaction_for_sliding_window( + app, session, self.mock_session_service + ) + self.mock_compactor.maybe_compact_events.assert_not_called() + self.mock_session_service.append_event.assert_not_called() + + async def test_run_compaction_for_sliding_window_not_enough_new_invocations( + self, + ): + app = App( + name='test', + root_agent=Mock(spec=BaseAgent), + events_compaction_config=EventsCompactionConfig( + compactor=self.mock_compactor, + compaction_interval=3, + overlap_size=1, + ), + ) + # Only two new invocations ('inv1', 'inv2'), less than compaction_interval=3. + session = Session( + app_name='test', + user_id='u1', + id='s1', + events=[ + self._create_event(1.0, 'inv1', 'e1'), + self._create_event(2.0, 'inv2', 'e2'), + ], + ) + await _run_compaction_for_sliding_window( + app, session, self.mock_session_service + ) + self.mock_compactor.maybe_compact_events.assert_not_called() + self.mock_session_service.append_event.assert_not_called() + + async def test_run_compaction_for_sliding_window_first_compaction(self): + app = App( + name='test', + root_agent=Mock(spec=BaseAgent), + events_compaction_config=EventsCompactionConfig( + compactor=self.mock_compactor, + compaction_interval=2, + overlap_size=1, + ), + ) + events = [ + self._create_event(1.0, 'inv1', 'e1'), + self._create_event(2.0, 'inv2', 'e2'), + self._create_event(3.0, 'inv3', 'e3'), + self._create_event(4.0, 'inv4', 'e4'), + ] + session = Session(app_name='test', user_id='u1', id='s1', events=events) + + mock_compacted_event = self._create_compacted_event( + 1.0, 4.0, 'Summary inv1-inv4' + ) + self.mock_compactor.maybe_compact_events.return_value = mock_compacted_event + + await _run_compaction_for_sliding_window( + app, session, self.mock_session_service + ) + + # Expected events to compact: inv1, inv2, inv3, inv4 + compacted_events_arg = self.mock_compactor.maybe_compact_events.call_args[ + 1 + ]['events'] + self.assertEqual( + [e.invocation_id for e in compacted_events_arg], + ['inv1', 'inv2', 'inv3', 'inv4'], + ) + self.mock_session_service.append_event.assert_called_once_with( + session=session, event=mock_compacted_event + ) + + async def test_run_compaction_for_sliding_window_with_overlap(self): + app = App( + name='test', + root_agent=Mock(spec=BaseAgent), + events_compaction_config=EventsCompactionConfig( + compactor=self.mock_compactor, + compaction_interval=2, + overlap_size=1, + ), + ) + # inv1-inv2 are already compacted. Last compacted end timestamp is 2.0. + initial_events = [ + self._create_event(1.0, 'inv1', 'e1'), + self._create_event(2.0, 'inv2', 'e2'), + self._create_compacted_event(1.0, 2.0, 'Summary inv1-inv2'), + ] + # Add new invocations inv3, inv4, inv5 + new_events = [ + self._create_event(3.0, 'inv3', 'e3'), + self._create_event(4.0, 'inv4', 'e4'), + self._create_event(5.0, 'inv5', 'e5'), + ] + session = Session( + app_name='test', + user_id='u1', + id='s1', + events=initial_events + new_events, + ) + + mock_compacted_event = self._create_compacted_event( + 2.0, 5.0, 'Summary inv2-inv5' + ) + self.mock_compactor.maybe_compact_events.return_value = mock_compacted_event + + await _run_compaction_for_sliding_window( + app, session, self.mock_session_service + ) + + # New invocations are inv3, inv4, inv5 (3 new) > threshold (2). + # Overlap size is 1, so start from 1 inv before inv3, which is inv2. + # Compact range: inv2 to inv5. + compacted_events_arg = self.mock_compactor.maybe_compact_events.call_args[ + 1 + ]['events'] + self.assertEqual( + [e.invocation_id for e in compacted_events_arg], + ['inv2', 'inv3', 'inv4', 'inv5'], + ) + self.mock_session_service.append_event.assert_called_once_with( + session=session, event=mock_compacted_event + ) + + async def test_run_compaction_for_sliding_window_no_compaction_event_returned( + self, + ): + app = App( + name='test', + root_agent=Mock(spec=BaseAgent), + events_compaction_config=EventsCompactionConfig( + compactor=self.mock_compactor, + compaction_interval=1, + overlap_size=0, + ), + ) + events = [self._create_event(1.0, 'inv1', 'e1')] + session = Session(app_name='test', user_id='u1', id='s1', events=events) + + self.mock_compactor.maybe_compact_events.return_value = None + + await _run_compaction_for_sliding_window( + app, session, self.mock_session_service + ) + + self.mock_compactor.maybe_compact_events.assert_called_once() + self.mock_session_service.append_event.assert_not_called() diff --git a/tests/unittests/apps/test_sliding_window_compactor.py b/tests/unittests/apps/test_sliding_window_compactor.py new file mode 100644 index 00000000..edc3ed21 --- /dev/null +++ b/tests/unittests/apps/test_sliding_window_compactor.py @@ -0,0 +1,162 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import AsyncMock +from unittest.mock import Mock + +from google.adk.apps.sliding_window_compactor import SlidingWindowCompactor +from google.adk.events.event import Event +from google.adk.events.event_actions import EventActions +from google.adk.events.event_actions import EventCompaction +from google.adk.models.base_llm import BaseLlm +from google.adk.models.llm_request import LlmRequest +from google.genai.types import Content +from google.genai.types import FunctionCall +from google.genai.types import FunctionResponse +from google.genai.types import Part +import pytest + + +@pytest.mark.parametrize( + 'env_variables', ['GOOGLE_AI', 'VERTEX'], indirect=True +) +class TestSlidingWindowCompactor(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + self.mock_llm = AsyncMock(spec=BaseLlm) + self.mock_llm.model = 'test-model' + self.compactor = SlidingWindowCompactor(llm=self.mock_llm) + + def _create_event( + self, timestamp: float, text: str, author: str = 'user' + ) -> Event: + return Event( + timestamp=timestamp, + author=author, + content=Content(parts=[Part(text=text)]), + ) + + async def test_maybe_compact_events_success(self): + events = [ + self._create_event(1.0, 'Hello', 'user'), + self._create_event(2.0, 'Hi there!', 'model'), + ] + expected_conversation_history = 'user: Hello\\nmodel: Hi there!' + expected_prompt = self.compactor._DEFAULT_PROMPT_TEMPLATE.format( + conversation_history=expected_conversation_history + ) + mock_llm_response = Mock(content=Content(parts=[Part(text='Summary')])) + + async def async_gen(): + yield mock_llm_response + + self.mock_llm.generate_content_async.return_value = async_gen() + + compacted_event = await self.compactor.maybe_compact_events(events=events) + + self.assertIsNotNone(compacted_event) + self.assertEqual( + compacted_event.actions.compaction.compacted_content.parts[0].text, + 'Summary', + ) + self.assertEqual(compacted_event.author, 'user') + self.assertIsNotNone(compacted_event.actions) + self.assertIsNotNone(compacted_event.actions.compaction) + self.assertEqual(compacted_event.actions.compaction.start_timestamp, 1.0) + self.assertEqual(compacted_event.actions.compaction.end_timestamp, 2.0) + self.assertEqual( + compacted_event.actions.compaction.compacted_content.parts[0].text, + 'Summary', + ) + + self.mock_llm.generate_content_async.assert_called_once() + args, kwargs = self.mock_llm.generate_content_async.call_args + llm_request = args[0] + self.assertIsInstance(llm_request, LlmRequest) + self.assertEqual(llm_request.model, 'test-model') + self.assertEqual(llm_request.contents[0].role, 'user') + self.assertEqual(llm_request.contents[0].parts[0].text, expected_prompt) + self.assertFalse(kwargs['stream']) + + async def test_maybe_compact_events_empty_llm_response(self): + events = [ + self._create_event(1.0, 'Hello', 'user'), + ] + mock_llm_response = Mock(content=None) + + async def async_gen(): + yield mock_llm_response + + self.mock_llm.generate_content_async.return_value = async_gen() + + compacted_event = await self.compactor.maybe_compact_events(events=events) + self.assertIsNone(compacted_event) + + async def test_maybe_compact_events_empty_input(self): + compacted_event = await self.compactor.maybe_compact_events(events=[]) + self.assertIsNone(compacted_event) + self.mock_llm.generate_content_async.assert_not_called() + + def test_format_events_for_prompt(self): + events = [ + self._create_event(1.0, 'User says...', 'user'), + self._create_event(2.0, 'Model replies...', 'model'), + self._create_event(3.0, 'Another user input', 'user'), + self._create_event(4.0, 'More model text', 'model'), + # Event with no content + Event(timestamp=5.0, author='user'), + # Event with empty content part + Event( + timestamp=6.0, + author='model', + content=Content(parts=[Part(text='')]), + ), + # Event with function call + Event( + timestamp=7.0, + author='model', + content=Content( + parts=[ + Part( + function_call=FunctionCall( + id='call_1', name='tool', args={} + ) + ) + ] + ), + ), + # Event with function response + Event( + timestamp=8.0, + author='model', + content=Content( + parts=[ + Part( + function_response=FunctionResponse( + id='call_1', + name='tool', + response={'result': 'done'}, + ) + ) + ] + ), + ), + ] + expected_formatted_history = ( + 'user: User says...\\nmodel: Model replies...\\nuser: Another user' + ' input\\nmodel: More model text' + ) + formatted_history = self.compactor._format_events_for_prompt(events) + self.assertEqual(formatted_history, expected_formatted_history) diff --git a/tests/unittests/conftest.py b/tests/unittests/conftest.py index 2b93226d..59b66bd6 100644 --- a/tests/unittests/conftest.py +++ b/tests/unittests/conftest.py @@ -38,7 +38,7 @@ ENV_SETUPS = { } -@fixture(autouse=True) +@fixture def env_variables(request: FixtureRequest): # Set up the environment env_name: str = request.param @@ -56,6 +56,35 @@ def env_variables(request: FixtureRequest): os.environ[key] = original_val +# Store original environment variables to restore later +_original_env = {} + + +@hookimpl(tryfirst=True) +def pytest_sessionstart(session): + """Set up environment variables at the beginning of the test session.""" + if not ENV_SETUPS: + return + # Use the first env setup to initialize environment for module-level imports + env_name = next(iter(ENV_SETUPS.keys())) + envs = ENV_SETUPS[env_name] + global _original_env + _original_env = {key: os.environ.get(key) for key in envs} + os.environ.update(envs) + + +@hookimpl(trylast=True) +def pytest_sessionfinish(session): + """Restore original environment variables at the end of the test session.""" + global _original_env + for key, original_val in _original_env.items(): + if original_val is None: + os.environ.pop(key, None) + else: + os.environ[key] = original_val + _original_env = {} + + @hookimpl(tryfirst=True) def pytest_generate_tests(metafunc: Metafunc): """Generate test cases for each environment setup.""" diff --git a/tests/unittests/tools/test_agent_tool.py b/tests/unittests/tools/test_agent_tool.py index 1f2a026e..0b476966 100644 --- a/tests/unittests/tools/test_agent_tool.py +++ b/tests/unittests/tools/test_agent_tool.py @@ -236,7 +236,7 @@ def test_update_artifacts(): ], indirect=True, ) -def test_custom_schema(): +def test_custom_schema(env_variables): class CustomInput(BaseModel): custom_input: str @@ -289,7 +289,9 @@ def test_custom_schema(): ], indirect=True, ) -def test_agent_tool_response_schema_no_output_schema_vertex_ai(): +def test_agent_tool_response_schema_no_output_schema_vertex_ai( + env_variables, +): """Test AgentTool with no output schema has string response schema for VERTEX_AI.""" tool_agent = Agent( name='tool_agent', @@ -314,7 +316,9 @@ def test_agent_tool_response_schema_no_output_schema_vertex_ai(): ], indirect=True, ) -def test_agent_tool_response_schema_with_output_schema_vertex_ai(): +def test_agent_tool_response_schema_with_output_schema_vertex_ai( + env_variables, +): """Test AgentTool with output schema has object response schema for VERTEX_AI.""" class CustomOutput(BaseModel): @@ -342,7 +346,9 @@ def test_agent_tool_response_schema_with_output_schema_vertex_ai(): ], indirect=True, ) -def test_agent_tool_response_schema_gemini_api(): +def test_agent_tool_response_schema_gemini_api( + env_variables, +): """Test AgentTool with GEMINI_API variant has no response schema.""" class CustomOutput(BaseModel): @@ -369,7 +375,9 @@ def test_agent_tool_response_schema_gemini_api(): ], indirect=True, ) -def test_agent_tool_response_schema_with_input_schema_vertex_ai(): +def test_agent_tool_response_schema_with_input_schema_vertex_ai( + env_variables, +): """Test AgentTool with input and output schemas for VERTEX_AI.""" class CustomInput(BaseModel): @@ -403,7 +411,9 @@ def test_agent_tool_response_schema_with_input_schema_vertex_ai(): ], indirect=True, ) -def test_agent_tool_response_schema_with_input_schema_no_output_vertex_ai(): +def test_agent_tool_response_schema_with_input_schema_no_output_vertex_ai( + env_variables, +): """Test AgentTool with input schema but no output schema for VERTEX_AI.""" class CustomInput(BaseModel):