feat: expose mcps streamable http custom httpx factory parameter

Merge https://github.com/google/adk-python/pull/3634

Reopen of https://github.com/google/adk-python/pull/2997 after the accepted changes in that PR were reverted by https://github.com/google/adk-python/commit/e15e19da05ee1b763228467e83f6f73e0eced4b5

---

This PR addresses https://github.com/google/adk-python/issues/3005 and https://github.com/google/adk-python/issues/2963 to allow control over the ssl cert used when connecting to an mcp server by exposing the `httpx_client_factory` parameter exposed when creating a `MCPSessionManager` in adk. Overlaps with https://github.com/google/adk-python/pull/2966 but I don't believe that that PR's implementation will work. `streamablehttp_client` needs a client factory, not a client.

## testing plan

Adds test checking that `streamablehttp_client` uses the custom httpx factory. Could also test that a factory which obeys the `McpHttpClientFactory` protocol produces valid behavior when the session is opened?

## related issues

#2227 and #2881 both request _slightly different_ options to control the ssl certs used internally by adk. I think exposing a httpx factory is a good pattern which could be followed for those issues too.

Co-authored-by: Kathy Wu <wukathy@google.com>
COPYBARA_INTEGRATE_REVIEW=https://github.com/google/adk-python/pull/3634 from mikeedjones:feat/expose-mcps-streamable-http-custom-httpx-factory-parameter 1017a9875c697b28b96a7a8d1311494ef89af65a
PiperOrigin-RevId: 852443631
This commit is contained in:
Michael Jones
2026-01-05 13:56:59 -08:00
committed by Copybara-Service
parent e3db2d0d83
commit bfed19cd78
2 changed files with 64 additions and 0 deletions
@@ -25,6 +25,8 @@ import sys
from typing import Any
from typing import Dict
from typing import Optional
from typing import Protocol
from typing import runtime_checkable
from typing import TextIO
from typing import Union
@@ -33,8 +35,11 @@ from mcp import ClientSession
from mcp import StdioServerParameters
from mcp.client.sse import sse_client
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import create_mcp_http_client
from mcp.client.streamable_http import McpHttpClientFactory
from mcp.client.streamable_http import streamablehttp_client
from pydantic import BaseModel
from pydantic import ConfigDict
logger = logging.getLogger('google_adk.' + __name__)
@@ -73,6 +78,11 @@ class SseConnectionParams(BaseModel):
sse_read_timeout: float = 60 * 5.0
@runtime_checkable
class CheckableMcpHttpClientFactory(McpHttpClientFactory, Protocol):
pass
class StreamableHTTPConnectionParams(BaseModel):
"""Parameters for the MCP Streamable HTTP connection.
@@ -88,13 +98,18 @@ class StreamableHTTPConnectionParams(BaseModel):
Streamable HTTP server.
terminate_on_close: Whether to terminate the MCP Streamable HTTP server
when the connection is closed.
httpx_client_factory: Factory function to create a custom HTTPX client. If
not provided, a default factory will be used.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
url: str
headers: dict[str, Any] | None = None
timeout: float = 5.0
sse_read_timeout: float = 60 * 5.0
terminate_on_close: bool = True
httpx_client_factory: CheckableMcpHttpClientFactory = create_mcp_http_client
def retry_on_errors(func):
@@ -275,6 +290,7 @@ class MCPSessionManager:
seconds=self._connection_params.sse_read_timeout
),
terminate_on_close=self._connection_params.terminate_on_close,
httpx_client_factory=self._connection_params.httpx_client_factory,
)
else:
raise ValueError(
@@ -114,6 +114,54 @@ class TestMCPSessionManager:
assert manager._connection_params == http_params
@patch("google.adk.tools.mcp_tool.mcp_session_manager.streamablehttp_client")
def test_init_with_streamable_http_custom_httpx_factory(
self, mock_streamablehttp_client
):
"""Test that streamablehttp_client is called with custom httpx_client_factory."""
custom_httpx_factory = Mock()
http_params = StreamableHTTPConnectionParams(
url="https://example.com/mcp",
timeout=15.0,
httpx_client_factory=custom_httpx_factory,
)
manager = MCPSessionManager(http_params)
manager._create_client()
mock_streamablehttp_client.assert_called_once_with(
url="https://example.com/mcp",
headers=None,
timeout=timedelta(seconds=15.0),
sse_read_timeout=timedelta(seconds=300.0),
terminate_on_close=True,
httpx_client_factory=custom_httpx_factory,
)
@patch("google.adk.tools.mcp_tool.mcp_session_manager.streamablehttp_client")
def test_init_with_streamable_http_default_httpx_factory(
self, mock_streamablehttp_client
):
"""Test that streamablehttp_client is called with default httpx_client_factory."""
http_params = StreamableHTTPConnectionParams(
url="https://example.com/mcp", timeout=15.0
)
manager = MCPSessionManager(http_params)
manager._create_client()
mock_streamablehttp_client.assert_called_once_with(
url="https://example.com/mcp",
headers=None,
timeout=timedelta(seconds=15.0),
sse_read_timeout=timedelta(seconds=300.0),
terminate_on_close=True,
httpx_client_factory=StreamableHTTPConnectionParams.model_fields[
"httpx_client_factory"
].get_default(),
)
def test_generate_session_key_stdio(self):
"""Test session key generation for stdio connections."""
manager = MCPSessionManager(self.mock_stdio_connection_params)