fix: Split SSE events with both content and artifactDelta in ADK Web Server

This change modifies the /run_sse endpoint to split events that contain both content and an artifactDelta. The original event is split into two separate SSE events: one containing only the content (with artifactDelta cleared) and another containing only the artifactDelta (with content cleared)

Close #4036

Co-authored-by: George Weale <gweale@google.com>
PiperOrigin-RevId: 852945249
This commit is contained in:
George Weale
2026-01-06 14:51:03 -08:00
committed by Copybara-Service
parent 1ae0e16b2c
commit 084fcfaba5
2 changed files with 82 additions and 8 deletions
+25 -8
View File
@@ -1531,14 +1531,31 @@ class AdkWebServer:
)
) as agen:
async for event in agen:
# Format as SSE data
sse_event = event.model_dump_json(
exclude_none=True, by_alias=True
)
logger.debug(
"Generated event in agent run streaming: %s", sse_event
)
yield f"data: {sse_event}\n\n"
# ADK Web renders artifacts from `actions.artifactDelta`
# during part processing *and* during action processing
# 1) the original event with `artifactDelta` cleared (content)
# 2) a content-less "action-only" event carrying `artifactDelta`
events_to_stream = [event]
if (
event.actions.artifact_delta
and event.content
and event.content.parts
):
content_event = event.model_copy(deep=True)
content_event.actions.artifact_delta = {}
artifact_event = event.model_copy(deep=True)
artifact_event.content = None
events_to_stream = [content_event, artifact_event]
for event_to_stream in events_to_stream:
sse_event = event_to_stream.model_dump_json(
exclude_none=True,
by_alias=True,
)
logger.debug(
"Generated event in agent run streaming: %s", sse_event
)
yield f"data: {sse_event}\n\n"
except Exception as e:
logger.exception("Error in event_generator: %s", e)
# You might want to yield an error event here
+57
View File
@@ -130,6 +130,7 @@ async def dummy_run_async(
new_message,
state_delta=None,
run_config: Optional[RunConfig] = None,
invocation_id: Optional[str] = None,
):
run_config = run_config or RunConfig()
yield _event_1()
@@ -959,6 +960,62 @@ def test_agent_run_passes_state_delta(test_app, create_test_session):
assert data[3]["actions"]["stateDelta"] == payload["state_delta"]
def test_agent_run_sse_splits_artifact_delta(
test_app, create_test_session, monkeypatch
):
"""Test /run_sse splits artifact deltas to avoid double-rendering in web."""
info = create_test_session
async def run_async_with_artifact_delta(
self,
*,
user_id: str,
session_id: str,
invocation_id: Optional[str] = None,
new_message: Optional[types.Content] = None,
state_delta: Optional[dict[str, Any]] = None,
run_config: Optional[RunConfig] = None,
):
del user_id, session_id, invocation_id, new_message, state_delta, run_config
yield Event(
author="dummy agent",
invocation_id="invocation_id",
content=types.Content(
role="model", parts=[types.Part(text="LLM reply")]
),
actions=EventActions(artifact_delta={"artifact.txt": 0}),
)
monkeypatch.setattr(Runner, "run_async", run_async_with_artifact_delta)
payload = {
"app_name": info["app_name"],
"user_id": info["user_id"],
"session_id": info["session_id"],
"new_message": {"role": "user", "parts": [{"text": "Hello agent"}]},
"streaming": True,
}
response = test_app.post("/run_sse", json=payload)
assert response.status_code == 200
sse_events = [
json.loads(line.removeprefix("data: "))
for line in response.text.splitlines()
if line.startswith("data: ")
]
assert len(sse_events) == 2
# First event: content but artifactDelta cleared.
assert sse_events[0]["content"]["parts"][0]["text"] == "LLM reply"
assert sse_events[0]["actions"]["artifactDelta"] == {}
# Second event: artifactDelta but no content.
assert "content" not in sse_events[1]
assert sse_events[1]["actions"]["artifactDelta"] == {"artifact.txt": 0}
def test_list_artifact_names(test_app, create_test_session):
"""Test listing artifact names for a session."""
info = create_test_session