fix(runners): Ensure event compaction completes by awaiting task

Fixes https://github.com/google/adk-python/issues/3174

The event compaction process, configured via `EventsCompactionConfig`, was
previously scheduled as a background task using `asyncio.create_task`.
Because Python's `asyncio.create_task` only holds a weak reference to
the created task and no strong reference was maintained by ADK, the
compaction task could be garbage-collected before it finished executing.
This resulted in event compaction failing silently or only partially
running, preventing session history from being summarized.

### Approaches Considered

Two approaches were considered to fix this:

1.  **`asyncio.create_task` + Reference:** Create the task with `create_task` and store a strong reference to it (e.g., in a `set` on the [Runner](http://_vscodecontentref_/0) instance), removing it only when complete via `task.add_done_callback()`.
    *   **Pros:** The [run_async](http://_vscodecontentref_/1) async generator finishes immediately after yielding the last agent event.
    *   **Cons:** Adds complexity to the Runner state; background task failures are silent to the [run_async](http://_vscodecontentref_/2) caller; requires enhancement to `runner.close()` to correctly manage pending tasks on shutdown.
2.  **`await`:** Directly `await` the compaction coroutine at the end of [run_async](http://_vscodecontentref_/3) after all agent events have been yielded.
    *   **Pros:** Simple to implement; ensures compaction runs to completion; failures during compaction propagate immediately to the [run_async](http://_vscodecontentref_/4) caller, making them visible and easier to debug.
    *   **Cons:** The `async for` loop iterating over [run_async](http://_vscodecontentref_/5) will not terminate until compaction finishes.

### Decision

This change implements the `await` approach. Although it means the `async for` loop takes longer to terminate when compaction occurs, it was chosen for its **simplicity and robustness**. Ensuring that compaction either succeeds or fails visibly is preferable to silent background failures.

All agent response events are yielded *before* compaction starts, so there is **no user-perceived delay in receiving the agent's answer** for the current turn.

### Integration Note for Users

Because compaction is now awaited, code consuming events via `async for event in runner.run_async(...)` will only finish iterating *after* compaction is complete (if compaction is triggered for that invocation).

If your application only needs the agent's response to proceed (e.g., displaying a message in a UI and allowing the user to reply), you can process events as they arrive and initiate the next turn without waiting for the `async for` loop to fully terminate. A new call to [run_async](http://_vscodecontentref_/6) for the next user query can be made immediately and will execute concurrently.

**Example:**

```python
async def handle_agent_turn(runner, message):
    print("Agent is thinking...")
    async for event in runner.run_async(user_id='...', session_id='...', new_message=message):
        # Stream events to UI, log, etc.
        if event.author == 'model' and event.content and event.content.parts[0].text:
            print(f"Agent response: {event.content.parts[0].text}")
            # The agent has provided a text response.
            # The application can now enable user input for the next turn,
            # even though this async for loop might not finish immediately
            # if compaction is running.
    print("Invocation complete (including compaction if any).")

# In your application:
# A new call to handle_agent_turn(runner, next_message) can be made
# as soon as the user provides the next input, without waiting for
# the previous call's generator to be exhausted.

Co-authored-by: Hangfei Lin <hangfei@google.com>
PiperOrigin-RevId: 833885375
This commit is contained in:
Hangfei Lin
2025-11-18 10:57:21 -08:00
committed by Copybara-Service
parent a12ae812d3
commit b5f5df9fa8
2 changed files with 268 additions and 209 deletions
+252 -200
View File
File diff suppressed because it is too large Load Diff
+16 -9
View File
@@ -292,6 +292,11 @@ class Runner:
This sync interface is only for local testing and convenience purpose.
Consider using `run_async` for production usage.
If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The generator will only finish iterating after event
compaction is complete.
Args:
user_id: The user ID of the session.
session_id: The session ID of the session.
@@ -350,6 +355,12 @@ class Runner:
) -> AsyncGenerator[Event, None]:
"""Main entry method to run the agent in this runner.
If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The async generator will only finish iterating after event
compaction is complete. However, this does not block new `run_async`
calls for subsequent user queries, which can be started concurrently.
Args:
user_id: The user ID of the session.
session_id: The session ID of the session.
@@ -431,16 +442,12 @@ class Runner:
async for event in agen:
yield event
# Run compaction after all events are yielded from the agent.
# (We don't compact in the middle of an invocation, we only compact at the end of an invocation.)
# (We don't compact in the middle of an invocation, we only compact at
# the end of an invocation.)
if self.app and self.app.events_compaction_config:
logger.info('Running event compactor.')
# Run compaction in a separate task to avoid blocking the main thread.
# So the users can still finish the event loop from the agent while the
# compaction is running.
asyncio.create_task(
_run_compaction_for_sliding_window(
self.app, session, self.session_service
)
logger.debug('Running event compactor.')
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen: