fix: fix compaction logic

When there are multiple intervals and compactions, the original implementation only keep the last one. The right implementation is to keep as many compaction events/summary as the requested internals.

PiperOrigin-RevId: 816516662
This commit is contained in:
Hangfei Lin
2025-10-07 21:15:38 -07:00
committed by Copybara-Service
parent e55b8946d6
commit 3f2b457efd
4 changed files with 209 additions and 21 deletions
@@ -18,6 +18,8 @@ from google.adk import Agent
from google.adk.agents.base_agent import BaseAgent
from google.adk.agents.callback_context import CallbackContext
from google.adk.apps import App
from google.adk.apps.app import EventsCompactionConfig
from google.adk.apps.llm_event_summarizer import LlmEventSummarizer
from google.adk.models.llm_request import LlmRequest
from google.adk.plugins.base_plugin import BasePlugin
from google.adk.plugins.context_filter_plugin import ContextFilterPlugin
@@ -147,7 +149,12 @@ app = App(
root_agent=root_agent,
plugins=[
CountInvocationPlugin(),
ContextFilterPlugin(num_invocations_to_keep=3),
# ContextFilterPlugin(num_invocations_to_keep=3),
SaveFilesAsArtifactsPlugin(),
],
# Enable event compaction with an LLM-based summarizer.
events_compaction_config=EventsCompactionConfig(
compaction_interval=2,
overlap_size=1,
),
)
+3 -4
View File
@@ -51,8 +51,7 @@ class LlmEventSummarizer(BaseEventsSummarizer):
' agent. Please summarize the conversation, focusing on key'
' information and decisions made, as well as any unresolved'
' questions or tasks. The summary should be concise and capture the'
' essence of the interaction. Each event is prefixed with the'
' author.\\n\\n{conversation_history}'
' essence of the interaction.\\n\\n{conversation_history}'
)
def __init__(
@@ -115,8 +114,8 @@ class LlmEventSummarizer(BaseEventsSummarizer):
if summary_content is None:
return None
# Ensure the compacted content has the role 'user'
summary_content.role = 'user'
# Ensure the compacted content has the role 'model'
summary_content.role = 'model'
start_timestamp = events[0].timestamp
end_timestamp = events[-1].timestamp
+85 -14
View File
@@ -214,7 +214,8 @@ def _contains_empty_content(event: Event) -> bool:
"""Check if an event should be skipped due to missing or empty content.
This can happen to the evnets that only changed session state.
When both content and transcriptions are empty, the event will be considered as empty.
When both content and transcriptions are empty, the event will be considered
as empty.
Args:
event: The event to check.
@@ -233,6 +234,64 @@ def _contains_empty_content(event: Event) -> bool:
) and (not event.output_transcription and not event.input_transcription)
def _process_compaction_events(events: list[Event]) -> list[Event]:
"""Processes events by applying compaction.
Identifies compacted ranges and filters out events that are covered by
compaction summaries.
Args:
events: A list of events to process.
Returns:
A list of events with compaction applied.
"""
# example of compaction events:
# [event_1(timestamp=1), event_2(timestamp=2),
# compaction_1(event_1, event_2, timestamp=3), event_3(timestamp=4),
# compaction_2(event_2, event_3, timestamp=5), event_4(timestamp=6)]
# for each compaction event, it only covers the events at most between the
# current compaction and the previous compaction. So during copmaction, we
# don't have to go across compaction boundaries.
# Compaction events are always strictly in order based on event timestamp.
events_to_process = []
last_compaction_start_time = float('inf')
# Iterate in reverse to easily handle overlapping compactions.
for event in reversed(events):
if event.actions and event.actions.compaction:
compaction = event.actions.compaction
if (
compaction.start_timestamp is not None
and compaction.end_timestamp is not None
):
# Create a new event for the compacted summary.
new_event = Event(
timestamp=compaction.end_timestamp,
author='model',
content=compaction.compacted_content,
branch=event.branch,
invocation_id=event.invocation_id,
actions=event.actions,
)
# Prepend to maintain chronological order in the final list.
events_to_process.insert(0, new_event)
# Update the boundary for filtering. Events with timestamps greater than
# or equal to this start time have been compacted.
last_compaction_start_time = min(
last_compaction_start_time, compaction.start_timestamp
)
elif event.timestamp < last_compaction_start_time:
# This event is not a compaction and is before the current compaction
# range. Prepend to maintain chronological order.
events_to_process.insert(0, event)
else:
# skip the event
pass
return events_to_process
def _get_contents(
current_branch: Optional[str], events: list[Event], agent_name: str = ''
) -> list[types.Content]:
@@ -254,6 +313,7 @@ def _get_contents(
# Parse the events, leaving the contents and the function calls and
# responses from the current agent.
raw_filtered_events = []
has_compaction_events = False
for event in events:
if _contains_empty_content(event):
continue
@@ -267,20 +327,27 @@ def _get_contents(
# Skip request confirmation events.
continue
if event.actions and event.actions.compaction:
has_compaction_events = True
raw_filtered_events.append(event)
if has_compaction_events:
events_to_process = _process_compaction_events(raw_filtered_events)
else:
events_to_process = raw_filtered_events
filtered_events = []
# aggregate transcription events
for i in range(len(raw_filtered_events)):
event = raw_filtered_events[i]
for i in range(len(events_to_process)):
event = events_to_process[i]
if not event.content:
# Convert transcription into normal event
if event.input_transcription and event.input_transcription.text:
accumulated_input_transcription += event.input_transcription.text
if (
i != len(raw_filtered_events) - 1
and raw_filtered_events[i + 1].input_transcription
and raw_filtered_events[i + 1].input_transcription.text
i != len(events_to_process) - 1
and events_to_process[i + 1].input_transcription
and events_to_process[i + 1].input_transcription.text
):
continue
event = event.model_copy(deep=True)
@@ -293,9 +360,9 @@ def _get_contents(
elif event.output_transcription and event.output_transcription.text:
accumulated_output_transcription += event.output_transcription.text
if (
i != len(raw_filtered_events) - 1
and raw_filtered_events[i + 1].output_transcription
and raw_filtered_events[i + 1].output_transcription.text
i != len(events_to_process) - 1
and events_to_process[i + 1].output_transcription
and events_to_process[i + 1].output_transcription.text
):
continue
event = event.model_copy(deep=True)
@@ -324,8 +391,9 @@ def _get_contents(
contents = []
for event in result_events:
content = copy.deepcopy(event.content)
remove_client_function_call_id(content)
contents.append(content)
if content:
remove_client_function_call_id(content)
contents.append(content)
return contents
@@ -557,7 +625,8 @@ def _is_live_model_audio_event(event: Event) -> bool:
),
],
role='model'
) grounding_metadata=None partial=None turn_complete=None finish_reason=None error_code=None error_message=None ...
) grounding_metadata=None partial=None turn_complete=None finish_reason=None
error_code=None error_message=None ...
"""
if not event.content:
return False
@@ -579,8 +648,10 @@ async def _add_instructions_to_user_content(
) -> None:
"""Insert instruction-related contents at proper position in conversation.
This function inserts instruction-related contents (passed as parameter) at the
proper position in the conversation flow, specifically before the last continuous
This function inserts instruction-related contents (passed as parameter) at
the
proper position in the conversation flow, specifically before the last
continuous
batch of user content to maintain conversation context.
Args:
+113 -2
View File
@@ -24,6 +24,7 @@ from google.adk.apps.llm_event_summarizer import LlmEventSummarizer
from google.adk.events.event import Event
from google.adk.events.event_actions import EventActions
from google.adk.events.event_actions import EventCompaction
from google.adk.flows.llm_flows import contents
from google.adk.sessions.base_session_service import BaseSessionService
from google.adk.sessions.session import Session
from google.genai.types import Content
@@ -47,7 +48,7 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
timestamp=timestamp,
invocation_id=invocation_id,
author='user',
content=Content(parts=[Part(text=text)]),
content=Content(role='user', parts=[Part(text=text)]),
)
def _create_compacted_event(
@@ -56,7 +57,9 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
compaction = EventCompaction(
start_timestamp=start_ts,
end_timestamp=end_ts,
compacted_content=Content(parts=[Part(text=summary_text)]),
compacted_content=Content(
role='model', parts=[Part(text=summary_text)]
),
)
return Event(
timestamp=end_ts,
@@ -221,3 +224,111 @@ class TestCompaction(unittest.IsolatedAsyncioTestCase):
self.mock_compactor.maybe_summarize_events.assert_called_once()
self.mock_session_service.append_event.assert_not_called()
def test_get_contents_with_multiple_compactions(self):
# Event timestamps: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0
# Compaction 1: covers 1.0 to 4.0 (summary at 4.0)
# Compaction 2: covers 6.0 to 9.0 (summary at 9.0)
events = [
self._create_event(1.0, 'inv1', 'Event 1'),
self._create_event(2.0, 'inv2', 'Event 2'),
self._create_event(3.0, 'inv3', 'Event 3'),
self._create_event(4.0, 'inv4', 'Event 4'),
self._create_compacted_event(1.0, 4.0, 'Summary 1-4'),
self._create_event(5.0, 'inv5', 'Event 5'),
self._create_event(6.0, 'inv6', 'Event 6'),
self._create_event(7.0, 'inv7', 'Event 7'),
self._create_event(8.0, 'inv8', 'Event 8'),
self._create_event(9.0, 'inv9', 'Event 9'),
self._create_compacted_event(6.0, 9.0, 'Summary 6-9'),
self._create_event(10.0, 'inv10', 'Event 10'),
]
result_contents = contents._get_contents(None, events)
# Expected contents:
# Summary 1-4 (at timestamp 4.0)
# Event 5 (at timestamp 5.0)
# Summary 6-9 (at timestamp 9.0)
# Event 10 (at timestamp 10.0)
expected_texts = [
'Summary 1-4',
'Event 5',
'Summary 6-9',
'Event 10',
]
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)
# Verify timestamps are in order
def test_get_contents_no_compaction(self):
events = [
self._create_event(1.0, 'inv1', 'Event 1'),
self._create_event(2.0, 'inv2', 'Event 2'),
self._create_event(3.0, 'inv3', 'Event 3'),
]
result_contents = contents._get_contents(None, events)
expected_texts = ['Event 1', 'Event 2', 'Event 3']
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)
def test_get_contents_single_compaction_at_start(self):
events = [
self._create_event(1.0, 'inv1', 'Event 1'),
self._create_event(2.0, 'inv2', 'Event 2'),
self._create_compacted_event(1.0, 2.0, 'Summary 1-2'),
self._create_event(3.0, 'inv3', 'Event 3'),
]
result_contents = contents._get_contents(None, events)
expected_texts = ['Summary 1-2', 'Event 3']
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)
def test_get_contents_single_compaction_in_middle(self):
events = [
self._create_event(1.0, 'inv1', 'Event 1'),
self._create_event(2.0, 'inv2', 'Event 2'),
self._create_compacted_event(1.0, 2.0, 'Summary 1-2'),
self._create_event(3.0, 'inv3', 'Event 3'),
self._create_event(4.0, 'inv4', 'Event 4'),
self._create_compacted_event(3.0, 4.0, 'Summary 3-4'),
self._create_event(5.0, 'inv5', 'Event 5'),
]
result_contents = contents._get_contents(None, events)
expected_texts = ['Summary 1-2', 'Summary 3-4', 'Event 5']
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)
def test_get_contents_compaction_at_end(self):
events = [
self._create_event(1.0, 'inv1', 'Event 1'),
self._create_event(2.0, 'inv2', 'Event 2'),
self._create_event(3.0, 'inv3', 'Event 3'),
self._create_compacted_event(2.0, 3.0, 'Summary 2-3'),
]
result_contents = contents._get_contents(None, events)
expected_texts = ['Event 1', 'Summary 2-3']
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)
def test_get_contents_compaction_at_beginning(self):
events = [
self._create_compacted_event(1.0, 2.0, 'Summary 1-2'),
self._create_event(3.0, 'inv3', 'Event 3'),
self._create_event(4.0, 'inv4', 'Event 4'),
]
result_contents = contents._get_contents(None, events)
expected_texts = ['Summary 1-2', 'Event 3', 'Event 4']
actual_texts = [c.parts[0].text for c in result_contents]
self.assertEqual(actual_texts, expected_texts)