From 7b25b8fb1daf54d7694bf405d545d46d2c012d2b Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Wed, 21 Jan 2026 10:21:14 -0800 Subject: [PATCH] fix(runner): Yield buffered function_call/function_response events during live streaming Bug: In live streaming mode, when function_call and function_response events arrive during active transcription, they are correctly buffered but never yielded to the caller. This causes callers to miss these events even though they are saved to the session. Fix: Add yield buffered_event after appending buffered events to the session when transcription ends. Testing: - Added unit test: test_live_streaming_buffered_function_call_yielded_during_transcription - Test verifies buffered events are yielded by: 1. Simulating partial transcription (triggers buffering) 2. Sending function_call during transcription (gets buffered) 3. Ending transcription (should yield buffered events) 4. Asserting both function_call and function_response are in yielded events Test results: - With fix: PASSED - Without fix (yield commented out): FAILED with "Buffered function_call event was not yielded" - Example event flow after fix: EVENT: partial=True, input_transcription="Show me the weather" EVENT: function_call=get_weather, args={'location': 'NYC'} <- Now yielded EVENT: function_response=get_weather, response={...} <- Now yielded EVENT: partial=False, input_transcription="Show me the weather for today" PiperOrigin-RevId: 859158546 --- src/google/adk/runners.py | 1 + tests/unittests/streaming/test_streaming.py | 111 ++++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/src/google/adk/runners.py b/src/google/adk/runners.py index bfccbdc9..b931561c 100644 --- a/src/google/adk/runners.py +++ b/src/google/adk/runners.py @@ -815,6 +815,7 @@ class Runner: await self.session_service.append_event( session=session, event=buffered_event ) + yield buffered_event # yield buffered events to caller buffered_events = [] else: # non-transcription event or empty transcription event, for diff --git a/tests/unittests/streaming/test_streaming.py b/tests/unittests/streaming/test_streaming.py index 57743d60..5ee4721c 100644 --- a/tests/unittests/streaming/test_streaming.py +++ b/tests/unittests/streaming/test_streaming.py @@ -1009,3 +1009,114 @@ def test_live_streaming_multiple_streaming_tools(): assert stock_call_found, 'Expected monitor_stock_price function call event.' assert video_call_found, 'Expected monitor_video_stream function call event.' + + +def test_live_streaming_buffered_function_call_yielded_during_transcription(): + """Test that function calls buffered during transcription are yielded. + + This tests the fix for the bug where function_call and function_response + events were buffered during active transcription but never yielded to the + caller. The fix ensures buffered events are yielded after transcription ends. + """ + function_call = types.Part.from_function_call( + name='get_weather', args={'location': 'San Francisco'} + ) + + response1 = LlmResponse( + input_transcription=types.Transcription(text='Show'), + partial=True, # ← Triggers is_transcribing = True + ) + response2 = LlmResponse( + content=types.Content( + role='model', parts=[function_call] + ), # ← Gets buffered + turn_complete=False, + ) + response3 = LlmResponse( + input_transcription=types.Transcription(text='Show me the weather'), + partial=False, # ← Transcription ends, buffered events yielded + ) + response4 = LlmResponse( + turn_complete=True, + ) + + mock_model = testing_utils.MockModel.create( + [response1, response2, response3, response4] + ) + + def get_weather(location: str) -> dict: + return {'temperature': 22, 'location': location} + + root_agent = Agent( + name='root_agent', + model=mock_model, + tools=[get_weather], + ) + + class CustomTestRunner(testing_utils.InMemoryRunner): + + def run_live( + self, + live_request_queue: LiveRequestQueue, + run_config: testing_utils.RunConfig = None, + ) -> list[testing_utils.Event]: + collected_responses = [] + + async def consume_responses(session: testing_utils.Session): + run_res = self.runner.run_live( + session=session, + live_request_queue=live_request_queue, + run_config=run_config or testing_utils.RunConfig(), + ) + + async for response in run_res: + collected_responses.append(response) + if len(collected_responses) >= 5: + return + + try: + session = self.session + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + asyncio.wait_for(consume_responses(session), timeout=5.0) + ) + finally: + loop.close() + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + + return collected_responses + + runner = CustomTestRunner(root_agent=root_agent) + live_request_queue = LiveRequestQueue() + live_request_queue.send_realtime( + blob=types.Blob(data=b'Show me the weather', mime_type='audio/pcm') + ) + + res_events = runner.run_live(live_request_queue) + + assert res_events is not None, 'Expected a list of events, got None.' + assert len(res_events) >= 1, 'Expected at least one event.' + + function_call_found = False + function_response_found = False + + for event in res_events: + if event.content and event.content.parts: + for part in event.content.parts: + if part.function_call and part.function_call.name == 'get_weather': + function_call_found = True + assert part.function_call.args['location'] == 'San Francisco' + if ( + part.function_response + and part.function_response.name == 'get_weather' + ): + function_response_found = True + assert part.function_response.response['temperature'] == 22 + + assert function_call_found, 'Buffered function_call event was not yielded.' + assert ( + function_response_found + ), 'Buffered function_response event was not yielded.'