fix(runner): Yield buffered function_call/function_response events during live streaming

Bug: In live streaming mode, when function_call and function_response events
arrive during active transcription, they are correctly buffered but never
yielded to the caller. This causes callers to miss these events even though
they are saved to the session.

Fix: Add yield buffered_event after appending buffered events to the session
when transcription ends.

Testing:
- Added unit test: test_live_streaming_buffered_function_call_yielded_during_transcription
- Test verifies buffered events are yielded by:
  1. Simulating partial transcription (triggers buffering)
  2. Sending function_call during transcription (gets buffered)
  3. Ending transcription (should yield buffered events)
  4. Asserting both function_call and function_response are in yielded events

Test results:
- With fix: PASSED
- Without fix (yield commented out): FAILED with "Buffered function_call event was not yielded"
- Example event flow after fix:
EVENT: partial=True, input_transcription="Show me the weather"
EVENT: function_call=get_weather, args={'location': 'NYC'} <- Now yielded
EVENT: function_response=get_weather, response={...} <- Now yielded
EVENT: partial=False, input_transcription="Show me the weather for today"
PiperOrigin-RevId: 859158546
This commit is contained in:
Google Team Member
2026-01-21 10:21:14 -08:00
committed by Copybara-Service
parent 910f65473f
commit 7b25b8fb1d
2 changed files with 112 additions and 0 deletions
+1
View File
@@ -815,6 +815,7 @@ class Runner:
await self.session_service.append_event(
session=session, event=buffered_event
)
yield buffered_event # yield buffered events to caller
buffered_events = []
else:
# non-transcription event or empty transcription event, for
+111
View File
@@ -1009,3 +1009,114 @@ def test_live_streaming_multiple_streaming_tools():
assert stock_call_found, 'Expected monitor_stock_price function call event.'
assert video_call_found, 'Expected monitor_video_stream function call event.'
def test_live_streaming_buffered_function_call_yielded_during_transcription():
"""Test that function calls buffered during transcription are yielded.
This tests the fix for the bug where function_call and function_response
events were buffered during active transcription but never yielded to the
caller. The fix ensures buffered events are yielded after transcription ends.
"""
function_call = types.Part.from_function_call(
name='get_weather', args={'location': 'San Francisco'}
)
response1 = LlmResponse(
input_transcription=types.Transcription(text='Show'),
partial=True, # ← Triggers is_transcribing = True
)
response2 = LlmResponse(
content=types.Content(
role='model', parts=[function_call]
), # ← Gets buffered
turn_complete=False,
)
response3 = LlmResponse(
input_transcription=types.Transcription(text='Show me the weather'),
partial=False, # ← Transcription ends, buffered events yielded
)
response4 = LlmResponse(
turn_complete=True,
)
mock_model = testing_utils.MockModel.create(
[response1, response2, response3, response4]
)
def get_weather(location: str) -> dict:
return {'temperature': 22, 'location': location}
root_agent = Agent(
name='root_agent',
model=mock_model,
tools=[get_weather],
)
class CustomTestRunner(testing_utils.InMemoryRunner):
def run_live(
self,
live_request_queue: LiveRequestQueue,
run_config: testing_utils.RunConfig = None,
) -> list[testing_utils.Event]:
collected_responses = []
async def consume_responses(session: testing_utils.Session):
run_res = self.runner.run_live(
session=session,
live_request_queue=live_request_queue,
run_config=run_config or testing_utils.RunConfig(),
)
async for response in run_res:
collected_responses.append(response)
if len(collected_responses) >= 5:
return
try:
session = self.session
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
asyncio.wait_for(consume_responses(session), timeout=5.0)
)
finally:
loop.close()
except (asyncio.TimeoutError, asyncio.CancelledError):
pass
return collected_responses
runner = CustomTestRunner(root_agent=root_agent)
live_request_queue = LiveRequestQueue()
live_request_queue.send_realtime(
blob=types.Blob(data=b'Show me the weather', mime_type='audio/pcm')
)
res_events = runner.run_live(live_request_queue)
assert res_events is not None, 'Expected a list of events, got None.'
assert len(res_events) >= 1, 'Expected at least one event.'
function_call_found = False
function_response_found = False
for event in res_events:
if event.content and event.content.parts:
for part in event.content.parts:
if part.function_call and part.function_call.name == 'get_weather':
function_call_found = True
assert part.function_call.args['location'] == 'San Francisco'
if (
part.function_response
and part.function_response.name == 'get_weather'
):
function_response_found = True
assert part.function_response.response['temperature'] == 22
assert function_call_found, 'Buffered function_call event was not yielded.'
assert (
function_response_found
), 'Buffered function_response event was not yielded.'