feat: Add intra-invocation compaction and token compaction pre-request

Compact session events before LLM calls when token threshold is exceeded

Co-authored-by: George Weale <gweale@google.com>
PiperOrigin-RevId: 873095899
This commit is contained in:
George Weale
2026-02-20 15:19:11 -08:00
committed by Copybara-Service
parent bbdf0ea257
commit 485fcb84e3
7 changed files with 983 additions and 133 deletions
@@ -24,6 +24,7 @@ from pydantic import ConfigDict
from pydantic import Field
from pydantic import PrivateAttr
from ..apps.app import EventsCompactionConfig
from ..apps.app import ResumabilityConfig
from ..artifacts.base_artifact_service import BaseArtifactService
from ..auth.credential_service.base_credential_service import BaseCredentialService
@@ -200,6 +201,12 @@ class InvocationContext(BaseModel):
resumability_config: Optional[ResumabilityConfig] = None
"""The resumability config that applies to all agents under this invocation."""
events_compaction_config: Optional[EventsCompactionConfig] = None
"""The compaction config for this invocation."""
token_compaction_checked: bool = False
"""Whether token-threshold compaction ran during this invocation."""
plugin_manager: PluginManager = Field(default_factory=PluginManager)
"""The manager for keeping track of plugins in this invocation."""
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,58 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Request processor that runs token-threshold event compaction."""
from __future__ import annotations
from typing import AsyncGenerator
from typing import TYPE_CHECKING
from ...apps.compaction import _has_token_threshold_config
from ...apps.compaction import _run_compaction_for_token_threshold_config
from ...events.event import Event
from ._base_llm_processor import BaseLlmRequestProcessor
if TYPE_CHECKING:
from ...agents.invocation_context import InvocationContext
from ...models.llm_request import LlmRequest
class CompactionRequestProcessor(BaseLlmRequestProcessor):
"""Compacts session events before contents are prepared for model calls."""
async def run_async(
self, invocation_context: InvocationContext, llm_request: LlmRequest
) -> AsyncGenerator[Event, None]:
del llm_request
config = invocation_context.events_compaction_config
if not _has_token_threshold_config(config):
return
yield # Required for AsyncGenerator.
token_compacted = await _run_compaction_for_token_threshold_config(
config=config,
session=invocation_context.session,
session_service=invocation_context.session_service,
agent=invocation_context.agent,
agent_name=invocation_context.agent.name,
current_branch=invocation_context.branch,
)
if token_compacted:
invocation_context.token_compaction_checked = True
return
yield # Required for AsyncGenerator.
request_processor = CompactionRequestProcessor()
@@ -22,6 +22,7 @@ from . import _code_execution
from . import _nl_planning
from . import _output_schema_processor
from . import basic
from . import compaction
from . import contents
from . import context_cache_processor
from . import identity
@@ -42,6 +43,9 @@ def _create_request_processors():
request_confirmation.request_processor,
instructions.request_processor,
identity.request_processor,
# Compaction should run before contents so compacted events are reflected
# in the model request context.
compaction.request_processor,
contents.request_processor,
# Context cache processor sets up cache config and finds
# existing cache metadata.
+7 -1
View File
@@ -553,7 +553,10 @@ class Runner:
if self.app and self.app.events_compaction_config:
logger.debug('Running event compactor.')
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
self.app,
session,
self.session_service,
skip_token_compaction=invocation_context.token_compaction_checked,
)
async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
@@ -1362,6 +1365,9 @@ class Runner:
credential_service=self.credential_service,
plugin_manager=self.plugin_manager,
context_cache_config=self.context_cache_config,
events_compaction_config=(
self.app.events_compaction_config if self.app else None
),
invocation_id=invocation_id,
agent=self.agent,
session=session,
+295 -1
View File
@@ -50,6 +50,7 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
invocation_id: str,
text: str,
prompt_token_count: int | None = None,
thought: bool = False,
) -> Event:
usage_metadata = None
if prompt_token_count is not None:
@@ -60,7 +61,60 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
timestamp=timestamp,
invocation_id=invocation_id,
author='user',
content=Content(role='user', parts=[Part(text=text)]),
content=Content(role='user', parts=[Part(text=text, thought=thought)]),
usage_metadata=usage_metadata,
)
def _create_function_call_event(
self,
timestamp: float,
invocation_id: str,
function_call_id: str,
) -> Event:
return Event(
timestamp=timestamp,
invocation_id=invocation_id,
author='agent',
content=Content(
role='model',
parts=[
Part(
function_call=types.FunctionCall(
id=function_call_id, name='tool', args={}
)
)
],
),
)
def _create_function_response_event(
self,
timestamp: float,
invocation_id: str,
function_call_id: str,
prompt_token_count: int | None = None,
) -> Event:
usage_metadata = None
if prompt_token_count is not None:
usage_metadata = types.GenerateContentResponseUsageMetadata(
prompt_token_count=prompt_token_count
)
return Event(
timestamp=timestamp,
invocation_id=invocation_id,
author='agent',
content=Content(
role='user',
parts=[
Part(
function_response=types.FunctionResponse(
id=function_call_id,
name='tool',
response={'result': 'ok'},
)
)
],
),
usage_metadata=usage_metadata,
)
@@ -249,9 +303,21 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
token_threshold=50_000,
event_retention_size=5,
)
self.assertEqual(config.compaction_interval, 2)
self.assertEqual(config.overlap_size, 1)
self.assertEqual(config.token_threshold, 50_000)
self.assertEqual(config.event_retention_size, 5)
def test_events_compaction_config_accepts_sliding_window_fields(self):
config = EventsCompactionConfig(
compaction_interval=2,
overlap_size=1,
)
self.assertEqual(config.compaction_interval, 2)
self.assertEqual(config.overlap_size, 1)
self.assertIsNone(config.token_threshold)
self.assertIsNone(config.event_retention_size)
def test_events_compaction_config_rejects_partial_token_fields(
self,
):
@@ -262,6 +328,23 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
token_threshold=50_000,
)
def test_events_compaction_config_rejects_partial_sliding_fields(
self,
):
with pytest.raises(ValidationError):
EventsCompactionConfig(
compaction_interval=2,
)
with pytest.raises(ValidationError):
EventsCompactionConfig(
overlap_size=0,
)
def test_events_compaction_config_rejects_missing_modes(self):
with pytest.raises(ValidationError):
EventsCompactionConfig()
def test_latest_prompt_token_count_fallback_applies_compaction(self):
events = [
self._create_event(1.0, 'inv1', 'a' * 40),
@@ -275,6 +358,25 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
# Visible text after compaction is: 'S' + ('c' * 20) = 21 chars.
self.assertEqual(estimated_token_count, 21 // 4)
def test_latest_prompt_token_count_fallback_uses_effective_contents(self):
events = [
self._create_event(1.0, 'inv1', 'visible'),
Event(
timestamp=2.0,
invocation_id='inv2',
author='model',
content=Content(
role='model',
parts=[Part(text='hidden-thought', thought=True)],
),
),
]
estimated_token_count = compaction_module._latest_prompt_token_count(events)
# Thought-only events are filtered by contents processing.
self.assertEqual(estimated_token_count, len('visible') // 4)
async def test_run_compaction_for_token_threshold_keeps_retention_events(
self,
):
@@ -324,6 +426,136 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
session=session, event=mock_compacted_event
)
async def test_run_compaction_for_token_threshold_keeps_tool_call_pair(
self,
):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
summarizer=self.mock_compactor,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=[
self._create_event(1.0, 'inv1', 'e1'),
self._create_function_call_event(2.0, 'inv2', 'tool-call-1'),
self._create_function_response_event(
3.0,
'inv2',
'tool-call-1',
prompt_token_count=100,
),
],
)
mock_compacted_event = self._create_compacted_event(
1.0, 1.0, 'Summary inv1'
)
self.mock_compactor.maybe_summarize_events.return_value = (
mock_compacted_event
)
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
compacted_events_arg = self.mock_compactor.maybe_summarize_events.call_args[
1
]['events']
self.assertEqual(
[e.invocation_id for e in compacted_events_arg],
['inv1'],
)
self.mock_session_service.append_event.assert_called_once_with(
session=session, event=mock_compacted_event
)
async def test_run_compaction_for_token_threshold_equal_threshold_compacts(
self,
):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
summarizer=self.mock_compactor,
compaction_interval=999,
overlap_size=0,
token_threshold=100,
event_retention_size=1,
),
)
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=[
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2', prompt_token_count=100),
],
)
mock_compacted_event = self._create_compacted_event(
1.0, 1.0, 'Summary inv1'
)
self.mock_compactor.maybe_summarize_events.return_value = (
mock_compacted_event
)
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
compacted_events_arg = self.mock_compactor.maybe_summarize_events.call_args[
1
]['events']
self.assertEqual(
[e.invocation_id for e in compacted_events_arg],
['inv1'],
)
self.mock_session_service.append_event.assert_called_once_with(
session=session, event=mock_compacted_event
)
async def test_run_compaction_skip_token_compaction(self):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
summarizer=self.mock_compactor,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=[
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2', prompt_token_count=100),
],
)
await _run_compaction_for_sliding_window(
app,
session,
self.mock_session_service,
skip_token_compaction=True,
)
self.mock_compactor.maybe_summarize_events.assert_not_called()
self.mock_session_service.append_event.assert_not_called()
async def test_run_compaction_for_token_threshold_seeds_previous_compaction(
self,
):
@@ -482,6 +714,68 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
session=session, event=mock_compacted_event
)
async def test_run_compaction_for_token_threshold_uses_latest_ordered_seed(
self,
):
app = App(
name='test',
root_agent=Mock(spec=BaseAgent),
events_compaction_config=EventsCompactionConfig(
summarizer=self.mock_compactor,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
session = Session(
app_name='test',
user_id='u1',
id='s1',
events=[
self._create_event(1.0, 'inv1', 'e1'),
self._create_event(2.0, 'inv2', 'e2'),
self._create_event(3.0, 'inv3', 'e3'),
self._create_event(4.0, 'inv4', 'e4'),
self._create_event(5.0, 'inv5', 'e5'),
self._create_event(15.0, 'inv6', 'e6'),
self._create_event(20.0, 'inv7', 'e7'),
self._create_compacted_event(
15.0, 20.0, 'Summary 15-20', appended_ts=21.0
),
self._create_compacted_event(
1.0, 5.0, 'Summary 1-5', appended_ts=22.0
),
self._create_event(23.0, 'inv8', 'e8'),
self._create_event(24.0, 'inv9', 'e9', prompt_token_count=120),
],
)
mock_compacted_event = self._create_compacted_event(
1.0, 23.0, 'Summary 1-23'
)
self.mock_compactor.maybe_summarize_events.return_value = (
mock_compacted_event
)
await _run_compaction_for_sliding_window(
app, session, self.mock_session_service
)
compacted_events_arg = self.mock_compactor.maybe_summarize_events.call_args[
1
]['events']
self.assertEqual(
compacted_events_arg[0].content.parts[0].text, 'Summary 1-5'
)
self.assertEqual(
[e.invocation_id for e in compacted_events_arg[1:]],
['inv6', 'inv7', 'inv8'],
)
self.mock_session_service.append_event.assert_called_once_with(
session=session, event=mock_compacted_event
)
def test_get_contents_with_multiple_compactions(self):
# Event timestamps: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0
@@ -0,0 +1,346 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for request-phase token compaction processor."""
from unittest.mock import AsyncMock
from google.adk.agents.invocation_context import InvocationContext
from google.adk.agents.llm_agent import LlmAgent
from google.adk.apps.app import EventsCompactionConfig
from google.adk.apps.llm_event_summarizer import LlmEventSummarizer
from google.adk.events.event import Event
from google.adk.flows.llm_flows import compaction
from google.adk.flows.llm_flows import contents
from google.adk.flows.llm_flows.single_flow import SingleFlow
from google.adk.models.llm_request import LlmRequest
from google.adk.sessions.base_session_service import BaseSessionService
from google.adk.sessions.session import Session
from google.genai import types
from google.genai.types import Content
from google.genai.types import Part
import pytest
def _create_event(
*,
timestamp: float,
invocation_id: str,
text: str,
prompt_token_count: int | None = None,
) -> Event:
usage_metadata = None
if prompt_token_count is not None:
usage_metadata = types.GenerateContentResponseUsageMetadata(
prompt_token_count=prompt_token_count
)
return Event(
timestamp=timestamp,
invocation_id=invocation_id,
author='user',
content=Content(role='user', parts=[Part(text=text)]),
usage_metadata=usage_metadata,
)
def test_single_flow_includes_compaction_before_contents():
flow = SingleFlow()
compaction_index = flow.request_processors.index(compaction.request_processor)
contents_index = flow.request_processors.index(contents.request_processor)
assert compaction_index < contents_index
@pytest.mark.asyncio
async def test_compaction_request_processor_no_token_config():
session = Session(app_name='app', user_id='user', id='session', events=[])
session_service = AsyncMock(spec=BaseSessionService)
invocation_context = InvocationContext(
invocation_id='invocation',
agent=LlmAgent(name='agent'),
session=session,
session_service=session_service,
events_compaction_config=EventsCompactionConfig(
compaction_interval=2,
overlap_size=0,
),
)
llm_request = LlmRequest()
processor = compaction.CompactionRequestProcessor()
events = []
async for event in processor.run_async(invocation_context, llm_request):
events.append(event)
assert not events
assert not invocation_context.token_compaction_checked
session_service.append_event.assert_not_called()
@pytest.mark.asyncio
async def test_compaction_request_processor_runs_token_compaction():
session = Session(
app_name='app',
user_id='user',
id='session',
events=[
_create_event(timestamp=1.0, invocation_id='inv1', text='e1'),
_create_event(timestamp=2.0, invocation_id='inv2', text='e2'),
_create_event(
timestamp=3.0,
invocation_id='inv3',
text='e3',
prompt_token_count=100,
),
],
)
session_service = AsyncMock(spec=BaseSessionService)
mock_summarizer = AsyncMock(spec=LlmEventSummarizer)
compacted_event = Event(author='compactor', invocation_id=Event.new_id())
mock_summarizer.maybe_summarize_events.return_value = compacted_event
invocation_context = InvocationContext(
invocation_id='invocation',
agent=LlmAgent(name='agent'),
session=session,
session_service=session_service,
events_compaction_config=EventsCompactionConfig(
summarizer=mock_summarizer,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
llm_request = LlmRequest()
processor = compaction.CompactionRequestProcessor()
events = []
async for event in processor.run_async(invocation_context, llm_request):
events.append(event)
assert not events
assert invocation_context.token_compaction_checked
compacted_events_arg = mock_summarizer.maybe_summarize_events.call_args[1][
'events'
]
assert [event.invocation_id for event in compacted_events_arg] == [
'inv1',
'inv2',
]
session_service.append_event.assert_called_once_with(
session=session, event=compacted_event
)
@pytest.mark.asyncio
async def test_compaction_request_processor_compacts_with_latest_tool_response():
session = Session(
app_name='app',
user_id='user',
id='session',
events=[
_create_event(timestamp=1.0, invocation_id='inv1', text='e1'),
_create_event(timestamp=2.0, invocation_id='inv2', text='e2'),
Event(
timestamp=3.0,
invocation_id='current-inv',
author='agent',
content=Content(
role='model',
parts=[
Part(
function_call=types.FunctionCall(
id='call-1', name='tool', args={}
)
)
],
),
),
Event(
timestamp=4.0,
invocation_id='current-inv',
author='agent',
content=Content(
role='user',
parts=[
Part(
function_response=types.FunctionResponse(
id='call-1',
name='tool',
response={'result': 'ok'},
)
)
],
),
usage_metadata=types.GenerateContentResponseUsageMetadata(
prompt_token_count=100
),
),
],
)
session_service = AsyncMock(spec=BaseSessionService)
mock_summarizer = AsyncMock(spec=LlmEventSummarizer)
compacted_event = Event(author='compactor', invocation_id=Event.new_id())
mock_summarizer.maybe_summarize_events.return_value = compacted_event
invocation_context = InvocationContext(
invocation_id='current-inv',
agent=LlmAgent(name='agent'),
session=session,
session_service=session_service,
events_compaction_config=EventsCompactionConfig(
summarizer=mock_summarizer,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
llm_request = LlmRequest()
processor = compaction.CompactionRequestProcessor()
events = []
async for event in processor.run_async(invocation_context, llm_request):
events.append(event)
assert not events
assert invocation_context.token_compaction_checked
compacted_events_arg = mock_summarizer.maybe_summarize_events.call_args[1][
'events'
]
assert [event.invocation_id for event in compacted_events_arg] == [
'inv1',
'inv2',
]
session_service.append_event.assert_called_once_with(
session=session, event=compacted_event
)
@pytest.mark.asyncio
async def test_compaction_request_processor_can_compact_current_user_event():
session = Session(
app_name='app',
user_id='user',
id='session',
events=[
_create_event(timestamp=1.0, invocation_id='inv1', text='e1'),
Event(
timestamp=2.0,
invocation_id='current-inv',
author='user',
content=Content(
role='user',
parts=[Part(text='latest user message')],
),
usage_metadata=types.GenerateContentResponseUsageMetadata(
prompt_token_count=100
),
),
],
)
session_service = AsyncMock(spec=BaseSessionService)
mock_summarizer = AsyncMock(spec=LlmEventSummarizer)
compacted_event = Event(author='compactor', invocation_id=Event.new_id())
mock_summarizer.maybe_summarize_events.return_value = compacted_event
invocation_context = InvocationContext(
invocation_id='current-inv',
agent=LlmAgent(name='agent'),
session=session,
session_service=session_service,
events_compaction_config=EventsCompactionConfig(
summarizer=mock_summarizer,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=0,
),
)
llm_request = LlmRequest()
processor = compaction.CompactionRequestProcessor()
events = []
async for event in processor.run_async(invocation_context, llm_request):
events.append(event)
assert not events
assert invocation_context.token_compaction_checked
compacted_events_arg = mock_summarizer.maybe_summarize_events.call_args[1][
'events'
]
assert [event.invocation_id for event in compacted_events_arg] == [
'inv1',
'current-inv',
]
session_service.append_event.assert_called_once_with(
session=session, event=compacted_event
)
@pytest.mark.asyncio
async def test_compaction_request_processor_not_marked_when_not_compacted():
session = Session(
app_name='app',
user_id='user',
id='session',
events=[
_create_event(timestamp=1.0, invocation_id='inv1', text='e1'),
_create_event(
timestamp=2.0,
invocation_id='inv2',
text='e2',
prompt_token_count=40,
),
],
)
session_service = AsyncMock(spec=BaseSessionService)
mock_summarizer = AsyncMock(spec=LlmEventSummarizer)
mock_summarizer.maybe_summarize_events.return_value = Event(
author='compactor',
invocation_id=Event.new_id(),
)
invocation_context = InvocationContext(
invocation_id='invocation',
agent=LlmAgent(name='agent'),
session=session,
session_service=session_service,
events_compaction_config=EventsCompactionConfig(
summarizer=mock_summarizer,
compaction_interval=999,
overlap_size=0,
token_threshold=50,
event_retention_size=1,
),
)
llm_request = LlmRequest()
processor = compaction.CompactionRequestProcessor()
events = []
async for event in processor.run_async(invocation_context, llm_request):
events.append(event)
assert not events
assert not invocation_context.token_compaction_checked
mock_summarizer.maybe_summarize_events.assert_not_called()
session_service.append_event.assert_not_called()