From 636f68fbee700aa47f01e2cfd746859353b3333d Mon Sep 17 00:00:00 2001 From: Haiyuan Cao Date: Wed, 25 Feb 2026 00:58:38 -0800 Subject: [PATCH] feat: Add RunSkillScriptTool to SkillToolset Introduces RunSkillScriptTool to execute scripts located in a skill's scripts/ directory. The execution logic is isolated within a dedicated SkillScriptCodeExecutor wrapper instantiated by RunSkillScriptTool. This wrapper manages script materialization in a temporary directory and executes Python (via runpy) or Shell scripts (returning standard output or JSON-encoded envelopes). This isolation eliminates the need to modify the underlying `BaseCodeExecutor` interface or implementations (`unsafe_local_code_executor`, etc.) to support working directories or file paths. Co-authored-by: Haiyuan Cao PiperOrigin-RevId: 875012237 --- src/google/adk/tools/skill_toolset.py | 366 ++++++++ tests/unittests/tools/test_skill_toolset.py | 894 +++++++++++++++++++- 2 files changed, 1256 insertions(+), 4 deletions(-) diff --git a/src/google/adk/tools/skill_toolset.py b/src/google/adk/tools/skill_toolset.py index f90dfdb2..d13481eb 100644 --- a/src/google/adk/tools/skill_toolset.py +++ b/src/google/adk/tools/skill_toolset.py @@ -12,16 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=g-import-not-at-top,protected-access + """Toolset for discovering, viewing, and executing agent skills.""" from __future__ import annotations +import asyncio +import json +import logging from typing import Any +from typing import Optional from typing import TYPE_CHECKING from google.genai import types from ..agents.readonly_context import ReadonlyContext +from ..code_executors.base_code_executor import BaseCodeExecutor +from ..code_executors.code_execution_utils import CodeExecutionInput from ..features import experimental from ..features import FeatureName from ..skills import models @@ -33,6 +41,11 @@ from .tool_context import ToolContext if TYPE_CHECKING: from ..models.llm_request import LlmRequest +logger = logging.getLogger("google_adk." + __name__) + +_DEFAULT_SCRIPT_TIMEOUT = 300 +_MAX_SKILL_PAYLOAD_BYTES = 16 * 1024 * 1024 # 16 MB + DEFAULT_SKILL_SYSTEM_INSTRUCTION = """You can use specialized 'skills' to help you with complex tasks. You MUST use the skill tools to interact with these skills. Skills are folders of instructions and resources that extend your capabilities for specialized tasks. Each skill folder contains: @@ -46,6 +59,7 @@ This is very important: 1. If a skill seems relevant to the current user query, you MUST use the `load_skill` tool with `name=""` to read its full instructions before proceeding. 2. Once you have read the instructions, follow them exactly as documented before replying to the user. For example, If the instruction lists multiple steps, please make sure you complete all of them in order. 3. The `load_skill_resource` tool is for viewing files within a skill's directory (e.g., `references/*`, `assets/*`, `scripts/*`). Do NOT use other tools to access these files. +4. Use `run_skill_script` to run scripts from a skill's `scripts/` directory. Use `load_skill_resource` to view script content first if needed. """ @@ -227,6 +241,340 @@ class LoadSkillResourceTool(BaseTool): } +class _SkillScriptCodeExecutor: + """A helper that materializes skill files and executes scripts.""" + + _base_executor: BaseCodeExecutor + _script_timeout: int + + def __init__(self, base_executor: BaseCodeExecutor, script_timeout: int): + self._base_executor = base_executor + self._script_timeout = script_timeout + + async def execute_script_async( + self, + invocation_context: Any, + skill: models.Skill, + script_path: str, + script_args: dict[str, Any], + ) -> dict[str, Any]: + """Prepares and executes the script using the base executor.""" + code = self._build_wrapper_code(skill, script_path, script_args) + if code is None: + if "." in script_path: + ext_msg = f"'.{script_path.rsplit('.', 1)[-1]}'" + else: + ext_msg = "(no extension)" + return { + "error": ( + f"Unsupported script type {ext_msg}." + " Supported types: .py, .sh, .bash" + ), + "error_code": "UNSUPPORTED_SCRIPT_TYPE", + } + + try: + # Execute the self-contained script using the underlying executor + result = await asyncio.to_thread( + self._base_executor.execute_code, + invocation_context, + CodeExecutionInput(code=code), + ) + + stdout = result.stdout + stderr = result.stderr + + # Shell scripts serialize both streams as JSON + # through stdout; parse the envelope if present. + rc = 0 + is_shell = "." in script_path and script_path.rsplit(".", 1)[ + -1 + ].lower() in ("sh", "bash") + if is_shell and stdout: + try: + parsed = json.loads(stdout) + if isinstance(parsed, dict) and parsed.get("__shell_result__"): + stdout = parsed.get("stdout", "") + stderr = parsed.get("stderr", "") + rc = parsed.get("returncode", 0) + if rc != 0 and not stderr: + stderr = f"Exit code {rc}" + except (json.JSONDecodeError, ValueError): + pass + + status = "success" + if rc != 0: + status = "error" + elif stderr and not stdout: + status = "error" + elif stderr: + status = "warning" + + return { + "skill_name": skill.name, + "script_path": script_path, + "stdout": stdout, + "stderr": stderr, + "status": status, + } + except SystemExit as e: + if e.code in (None, 0): + return { + "skill_name": skill.name, + "script_path": script_path, + "stdout": "", + "stderr": "", + "status": "success", + } + return { + "error": ( + f"Failed to execute script '{script_path}':" + f" exited with code {e.code}" + ), + "error_code": "EXECUTION_ERROR", + } + except Exception as e: # pylint: disable=broad-exception-caught + logger.exception( + "Error executing script '%s' from skill '%s'", + script_path, + skill.name, + ) + short_msg = str(e) + if len(short_msg) > 200: + short_msg = short_msg[:200] + "..." + return { + "error": ( + "Failed to execute script" + f" '{script_path}':\n{type(e).__name__}:" + f" {short_msg}" + ), + "error_code": "EXECUTION_ERROR", + } + + def _build_wrapper_code( + self, + skill: models.Skill, + script_path: str, + script_args: dict[str, Any], + ) -> str | None: + """Builds a self-extracting Python script.""" + ext = "" + if "." in script_path: + ext = script_path.rsplit(".", 1)[-1].lower() + + if not script_path.startswith("scripts/"): + script_path = f"scripts/{script_path}" + + files_dict = {} + for ref_name in skill.resources.list_references(): + content = skill.resources.get_reference(ref_name) + if content is not None: + files_dict[f"references/{ref_name}"] = content + + for asset_name in skill.resources.list_assets(): + content = skill.resources.get_asset(asset_name) + if content is not None: + files_dict[f"assets/{asset_name}"] = content + + for scr_name in skill.resources.list_scripts(): + scr = skill.resources.get_script(scr_name) + if scr is not None and scr.src is not None: + files_dict[f"scripts/{scr_name}"] = scr.src + + total_size = sum( + len(v) if isinstance(v, (str, bytes)) else 0 + for v in files_dict.values() + ) + if total_size > _MAX_SKILL_PAYLOAD_BYTES: + logger.warning( + "Skill '%s' resources total %d bytes, exceeding" + " the recommended limit of %d bytes.", + skill.name, + total_size, + _MAX_SKILL_PAYLOAD_BYTES, + ) + + # Build the boilerplate extract string + code_lines = [ + "import os", + "import tempfile", + "import sys", + "import json as _json", + "import subprocess", + "import runpy", + f"_files = {files_dict!r}", + "def _materialize_and_run():", + " _orig_cwd = os.getcwd()", + " with tempfile.TemporaryDirectory() as td:", + " for rel_path, content in _files.items():", + " full_path = os.path.join(td, rel_path)", + " os.makedirs(os.path.dirname(full_path), exist_ok=True)", + " mode = 'wb' if isinstance(content, bytes) else 'w'", + " with open(full_path, mode) as f:", + " f.write(content)", + " os.chdir(td)", + " try:", + ] + + if ext == "py": + argv_list = [script_path] + for k, v in script_args.items(): + argv_list.extend([f"--{k}", str(v)]) + code_lines.extend([ + f" sys.argv = {argv_list!r}", + " try:", + f" runpy.run_path({script_path!r}, run_name='__main__')", + " except SystemExit as e:", + " if e.code is not None and e.code != 0:", + " raise e", + ]) + elif ext in ("sh", "bash"): + arr = ["bash", script_path] + for k, v in script_args.items(): + arr.extend([f"--{k}", str(v)]) + timeout = self._script_timeout + code_lines.extend([ + " try:", + " _r = subprocess.run(", + f" {arr!r},", + " capture_output=True, text=True,", + f" timeout={timeout!r}, cwd=td,", + " )", + " print(_json.dumps({", + " '__shell_result__': True,", + " 'stdout': _r.stdout,", + " 'stderr': _r.stderr,", + " 'returncode': _r.returncode,", + " }))", + " except subprocess.TimeoutExpired as _e:", + " print(_json.dumps({", + " '__shell_result__': True,", + " 'stdout': _e.stdout or '',", + f" 'stderr': 'Timed out after {timeout}s',", + " 'returncode': -1,", + " }))", + ]) + else: + return None + + code_lines.extend([ + " finally:", + " os.chdir(_orig_cwd)", + ]) + + code_lines.append("_materialize_and_run()") + return "\n".join(code_lines) + + +@experimental(FeatureName.SKILL_TOOLSET) +class RunSkillScriptTool(BaseTool): + """Tool to execute scripts from a skill's scripts/ directory.""" + + def __init__(self, toolset: "SkillToolset"): + super().__init__( + name="run_skill_script", + description="Executes a script from a skill's scripts/ directory.", + ) + self._toolset = toolset + + def _get_declaration(self) -> types.FunctionDeclaration | None: + return types.FunctionDeclaration( + name=self.name, + description=self.description, + parameters_json_schema={ + "type": "object", + "properties": { + "skill_name": { + "type": "string", + "description": "The name of the skill.", + }, + "script_path": { + "type": "string", + "description": ( + "The relative path to the script (e.g.," + " 'scripts/setup.py')." + ), + }, + "args": { + "type": "object", + "description": ( + "Optional arguments to pass to the script as key-value" + " pairs." + ), + }, + }, + "required": ["skill_name", "script_path"], + }, + ) + + async def run_async( + self, *, args: dict[str, Any], tool_context: ToolContext + ) -> Any: + skill_name = args.get("skill_name") + script_path = args.get("script_path") + script_args = args.get("args", {}) + if not isinstance(script_args, dict): + return { + "error": ( + "'args' must be a JSON object (key-value pairs)," + f" got {type(script_args).__name__}." + ), + "error_code": "INVALID_ARGS_TYPE", + } + + if not skill_name: + return { + "error": "Skill name is required.", + "error_code": "MISSING_SKILL_NAME", + } + if not script_path: + return { + "error": "Script path is required.", + "error_code": "MISSING_SCRIPT_PATH", + } + + skill = self._toolset._get_skill(skill_name) + if not skill: + return { + "error": f"Skill '{skill_name}' not found.", + "error_code": "SKILL_NOT_FOUND", + } + + script = None + if script_path.startswith("scripts/"): + script = skill.resources.get_script(script_path[len("scripts/") :]) + else: + script = skill.resources.get_script(script_path) + + if script is None: + return { + "error": f"Script '{script_path}' not found in skill '{skill_name}'.", + "error_code": "SCRIPT_NOT_FOUND", + } + + # Resolve code executor: toolset-level first, then agent fallback + code_executor = self._toolset._code_executor + if code_executor is None: + agent = tool_context._invocation_context.agent + if hasattr(agent, "code_executor"): + code_executor = agent.code_executor + if code_executor is None: + return { + "error": ( + "No code executor configured. A code executor is" + " required to run scripts." + ), + "error_code": "NO_CODE_EXECUTOR", + } + + script_executor = _SkillScriptCodeExecutor( + code_executor, self._toolset._script_timeout # pylint: disable=protected-access + ) + return await script_executor.execute_script_async( + tool_context._invocation_context, skill, script_path, script_args # pylint: disable=protected-access + ) + + @experimental(FeatureName.SKILL_TOOLSET) class SkillToolset(BaseToolset): """A toolset for managing and interacting with agent skills.""" @@ -234,7 +582,19 @@ class SkillToolset(BaseToolset): def __init__( self, skills: list[models.Skill], + *, + code_executor: Optional[BaseCodeExecutor] = None, + script_timeout: int = _DEFAULT_SCRIPT_TIMEOUT, ): + """Initializes the SkillToolset. + + Args: + skills: List of skills to register. + code_executor: Optional code executor for script execution. + script_timeout: Timeout in seconds for shell script execution via + subprocess.run. Defaults to 300 seconds. Does not apply to Python + scripts executed via exec(). + """ super().__init__() # Check for duplicate skill names @@ -245,11 +605,17 @@ class SkillToolset(BaseToolset): seen.add(skill.name) self._skills = {skill.name: skill for skill in skills} + self._code_executor = code_executor + self._script_timeout = script_timeout + + # Initialize core skill tools self._tools = [ ListSkillsTool(self), LoadSkillTool(self), LoadSkillResourceTool(self), ] + # Always add RunSkillScriptTool, relies on invocation_context fallback if _code_executor is None + self._tools.append(RunSkillScriptTool(self)) async def get_tools( self, readonly_context: ReadonlyContext | None = None diff --git a/tests/unittests/tools/test_skill_toolset.py b/tests/unittests/tools/test_skill_toolset.py index 066eedfb..65323324 100644 --- a/tests/unittests/tools/test_skill_toolset.py +++ b/tests/unittests/tools/test_skill_toolset.py @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=redefined-outer-name,g-import-not-at-top,protected-access + + from unittest import mock +from google.adk.code_executors.base_code_executor import BaseCodeExecutor +from google.adk.code_executors.code_execution_utils import CodeExecutionResult from google.adk.models import llm_request as llm_request_model from google.adk.skills import models from google.adk.tools import skill_toolset @@ -27,6 +32,7 @@ def mock_skill1_frontmatter(): frontmatter = mock.create_autospec(models.Frontmatter, instance=True) frontmatter.name = "skill1" frontmatter.description = "Skill 1 description" + frontmatter.allowed_tools = ["test_tool"] frontmatter.model_dump.return_value = { "name": "skill1", "description": "Skill 1 description", @@ -43,7 +49,14 @@ def mock_skill1(mock_skill1_frontmatter): skill.instructions = "instructions for skill1" skill.frontmatter = mock_skill1_frontmatter skill.resources = mock.MagicMock( - spec=["get_reference", "get_asset", "get_script"] + spec=[ + "get_reference", + "get_asset", + "get_script", + "list_references", + "list_assets", + "list_scripts", + ] ) def get_ref(name): @@ -59,11 +72,22 @@ def mock_skill1(mock_skill1_frontmatter): def get_script(name): if name == "setup.sh": return models.Script(src="echo setup") + if name == "run.py": + return models.Script(src="print('hello')") + if name == "build.rb": + return models.Script(src="puts 'hello'") return None skill.resources.get_reference.side_effect = get_ref skill.resources.get_asset.side_effect = get_asset skill.resources.get_script.side_effect = get_script + skill.resources.list_references.return_value = ["ref1.md"] + skill.resources.list_assets.return_value = ["asset1.txt"] + skill.resources.list_scripts.return_value = [ + "setup.sh", + "run.py", + "build.rb", + ] return skill @@ -73,6 +97,7 @@ def mock_skill2_frontmatter(): frontmatter = mock.create_autospec(models.Frontmatter, instance=True) frontmatter.name = "skill2" frontmatter.description = "Skill 2 description" + frontmatter.allowed_tools = [] frontmatter.model_dump.return_value = { "name": "skill2", "description": "Skill 2 description", @@ -89,7 +114,14 @@ def mock_skill2(mock_skill2_frontmatter): skill.instructions = "instructions for skill2" skill.frontmatter = mock_skill2_frontmatter skill.resources = mock.MagicMock( - spec=["get_reference", "get_asset", "get_script"] + spec=[ + "get_reference", + "get_asset", + "get_script", + "list_references", + "list_assets", + "list_scripts", + ] ) def get_ref(name): @@ -104,6 +136,9 @@ def mock_skill2(mock_skill2_frontmatter): skill.resources.get_reference.side_effect = get_ref skill.resources.get_asset.side_effect = get_asset + skill.resources.list_references.return_value = ["ref2.md"] + skill.resources.list_assets.return_value = ["asset2.txt"] + skill.resources.list_scripts.return_value = [] return skill @@ -132,13 +167,13 @@ def test_list_skills(mock_skill1, mock_skill2): async def test_get_tools(mock_skill1, mock_skill2): toolset = skill_toolset.SkillToolset([mock_skill1, mock_skill2]) tools = await toolset.get_tools() - assert len(tools) == 3 + assert len(tools) == 4 assert isinstance(tools[0], skill_toolset.ListSkillsTool) assert isinstance(tools[1], skill_toolset.LoadSkillTool) assert isinstance(tools[2], skill_toolset.LoadSkillResourceTool) + assert isinstance(tools[3], skill_toolset.RunSkillScriptTool) -@pytest.mark.asyncio @pytest.mark.asyncio async def test_list_skills_tool( mock_skill1, mock_skill2, tool_context_instance @@ -308,3 +343,854 @@ async def test_scripts_resource_not_found(mock_skill1, tool_context_instance): tool_context=tool_context_instance, ) assert result["error_code"] == "RESOURCE_NOT_FOUND" + + +# RunSkillScriptTool tests + + +def _make_tool_context_with_agent(agent=None): + """Creates a mock ToolContext with _invocation_context.agent.""" + ctx = mock.MagicMock(spec=tool_context.ToolContext) + ctx._invocation_context = mock.MagicMock() + ctx._invocation_context.agent = agent or mock.MagicMock() + return ctx + + +def _make_mock_executor(stdout="", stderr=""): + """Creates a mock code executor that returns the given output.""" + executor = mock.create_autospec(BaseCodeExecutor, instance=True) + executor.execute_code.return_value = CodeExecutionResult( + stdout=stdout, stderr=stderr + ) + return executor + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "args, expected_error_code", + [ + ( + {"script_path": "setup.sh"}, + "MISSING_SKILL_NAME", + ), + ( + {"skill_name": "skill1"}, + "MISSING_SCRIPT_PATH", + ), + ( + {"skill_name": "", "script_path": "setup.sh"}, + "MISSING_SKILL_NAME", + ), + ( + {"skill_name": "skill1", "script_path": ""}, + "MISSING_SCRIPT_PATH", + ), + ], +) +async def test_execute_script_missing_params( + mock_skill1, args, expected_error_code +): + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async(args=args, tool_context=ctx) + assert result["error_code"] == expected_error_code + + +@pytest.mark.asyncio +async def test_execute_script_skill_not_found(mock_skill1): + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "nonexistent", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["error_code"] == "SKILL_NOT_FOUND" + + +@pytest.mark.asyncio +async def test_execute_script_script_not_found(mock_skill1): + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "nonexistent.py"}, + tool_context=ctx, + ) + assert result["error_code"] == "SCRIPT_NOT_FOUND" + + +@pytest.mark.asyncio +async def test_execute_script_no_code_executor(mock_skill1): + toolset = skill_toolset.SkillToolset([mock_skill1]) + tool = skill_toolset.RunSkillScriptTool(toolset) + # Agent without code_executor attribute + agent = mock.MagicMock(spec=[]) + ctx = _make_tool_context_with_agent(agent=agent) + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["error_code"] == "NO_CODE_EXECUTOR" + + +@pytest.mark.asyncio +async def test_execute_script_agent_code_executor_none(mock_skill1): + """Agent has code_executor attr but it's None.""" + toolset = skill_toolset.SkillToolset([mock_skill1]) + tool = skill_toolset.RunSkillScriptTool(toolset) + agent = mock.MagicMock() + agent.code_executor = None + ctx = _make_tool_context_with_agent(agent=agent) + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["error_code"] == "NO_CODE_EXECUTOR" + + +@pytest.mark.asyncio +async def test_execute_script_unsupported_type(mock_skill1): + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "build.rb"}, + tool_context=ctx, + ) + assert result["error_code"] == "UNSUPPORTED_SCRIPT_TYPE" + + +@pytest.mark.asyncio +async def test_execute_script_python_success(mock_skill1): + executor = _make_mock_executor(stdout="hello\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["stdout"] == "hello\n" + assert result["stderr"] == "" + assert result["skill_name"] == "skill1" + assert result["script_path"] == "run.py" + + # Verify the code passed to executor runs the python scripts + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert "_materialize_and_run()" in code_input.code + assert "import runpy" in code_input.code + assert "sys.argv = ['scripts/run.py']" in code_input.code + assert ( + "runpy.run_path('scripts/run.py', run_name='__main__')" in code_input.code + ) + + +@pytest.mark.asyncio +async def test_execute_script_shell_success(mock_skill1): + executor = _make_mock_executor(stdout="setup\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["stdout"] == "setup\n" + + # Verify the code wraps in subprocess.run with JSON envelope + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert "subprocess.run" in code_input.code + assert "bash" in code_input.code + assert "__shell_result__" in code_input.code + + +@pytest.mark.asyncio +async def test_execute_script_with_input_args_python(mock_skill1): + executor = _make_mock_executor(stdout="done\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "skill1", + "script_path": "run.py", + "args": {"verbose": True, "count": "3"}, + }, + tool_context=ctx, + ) + assert result["status"] == "success" + + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert ( + "['scripts/run.py', '--verbose', 'True', '--count', '3']" + in code_input.code + ) + + +@pytest.mark.asyncio +async def test_execute_script_with_input_args_shell(mock_skill1): + executor = _make_mock_executor(stdout="done\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "skill1", + "script_path": "setup.sh", + "args": {"force": True}, + }, + tool_context=ctx, + ) + assert result["status"] == "success" + + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert "['bash', 'scripts/setup.sh', '--force', 'True']" in code_input.code + + +@pytest.mark.asyncio +async def test_execute_script_scripts_prefix_stripping(mock_skill1): + executor = _make_mock_executor(stdout="setup\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "skill1", + "script_path": "scripts/setup.sh", + }, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["script_path"] == "scripts/setup.sh" + + +@pytest.mark.asyncio +async def test_execute_script_toolset_executor_priority(mock_skill1): + """Toolset-level executor takes priority over agent's.""" + toolset_executor = _make_mock_executor(stdout="from toolset\n") + agent_executor = _make_mock_executor(stdout="from agent\n") + toolset = skill_toolset.SkillToolset( + [mock_skill1], code_executor=toolset_executor + ) + tool = skill_toolset.RunSkillScriptTool(toolset) + agent = mock.MagicMock() + agent.code_executor = agent_executor + ctx = _make_tool_context_with_agent(agent=agent) + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["stdout"] == "from toolset\n" + toolset_executor.execute_code.assert_called_once() + agent_executor.execute_code.assert_not_called() + + +@pytest.mark.asyncio +async def test_execute_script_agent_executor_fallback(mock_skill1): + """Falls back to agent's code executor when toolset has none.""" + agent_executor = _make_mock_executor(stdout="from agent\n") + toolset = skill_toolset.SkillToolset([mock_skill1]) + tool = skill_toolset.RunSkillScriptTool(toolset) + agent = mock.MagicMock() + agent.code_executor = agent_executor + ctx = _make_tool_context_with_agent(agent=agent) + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["stdout"] == "from agent\n" + agent_executor.execute_code.assert_called_once() + + +@pytest.mark.asyncio +async def test_execute_script_execution_error(mock_skill1): + executor = _make_mock_executor() + executor.execute_code.side_effect = RuntimeError("boom") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["error_code"] == "EXECUTION_ERROR" + assert "boom" in result["error"] + assert result["error"].startswith("Failed to execute script 'run.py':") + + +@pytest.mark.asyncio +async def test_execute_script_stderr_only_sets_error_status(mock_skill1): + """stderr with no stdout should report error status.""" + executor = _make_mock_executor(stdout="", stderr="fatal error\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["status"] == "error" + assert result["stderr"] == "fatal error\n" + + +@pytest.mark.asyncio +async def test_execute_script_stderr_with_stdout_sets_warning(mock_skill1): + """stderr alongside stdout should report warning status.""" + executor = _make_mock_executor(stdout="output\n", stderr="deprecation\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["status"] == "warning" + assert result["stdout"] == "output\n" + assert result["stderr"] == "deprecation\n" + + +@pytest.mark.asyncio +async def test_execute_script_execution_error_truncated(mock_skill1): + """Long exception messages are truncated to avoid wasting LLM tokens.""" + executor = _make_mock_executor() + executor.execute_code.side_effect = RuntimeError("x" * 300) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["error_code"] == "EXECUTION_ERROR" + # 200 chars of the message + "..." suffix + the prefix + assert result["error"].endswith("...") + assert len(result["error"]) < 300 + + +@pytest.mark.asyncio +async def test_execute_script_system_exit_caught(mock_skill1): + """sys.exit() in a script should not terminate the process.""" + executor = _make_mock_executor() + executor.execute_code.side_effect = SystemExit(1) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["error_code"] == "EXECUTION_ERROR" + assert "exited with code 1" in result["error"] + + +@pytest.mark.asyncio +async def test_execute_script_system_exit_zero_is_success(mock_skill1): + """sys.exit(0) is a normal termination and should report success.""" + executor = _make_mock_executor() + executor.execute_code.side_effect = SystemExit(0) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["status"] == "success" + + +@pytest.mark.asyncio +async def test_execute_script_system_exit_none_is_success(mock_skill1): + """sys.exit() with no arg (None) should report success.""" + executor = _make_mock_executor() + executor.execute_code.side_effect = SystemExit(None) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + assert result["status"] == "success" + + +@pytest.mark.asyncio +async def test_execute_script_shell_includes_timeout(mock_skill1): + """Shell wrapper includes timeout in subprocess.run.""" + executor = _make_mock_executor(stdout="ok\n") + toolset = skill_toolset.SkillToolset( + [mock_skill1], code_executor=executor, script_timeout=60 + ) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "success" + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert "timeout=60" in code_input.code + + +@pytest.mark.asyncio +async def test_execute_script_extensionless_unsupported(mock_skill1): + """Files without extensions should return UNSUPPORTED_SCRIPT_TYPE.""" + # Add a script with no extension to the mock + original_side_effect = mock_skill1.resources.get_script.side_effect + + def get_script_extended(name): + if name == "noext": + return models.Script(src="print('hi')") + return original_side_effect(name) + + mock_skill1.resources.get_script.side_effect = get_script_extended + + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "noext"}, + tool_context=ctx, + ) + assert result["error_code"] == "UNSUPPORTED_SCRIPT_TYPE" + + +# ── Integration tests using real UnsafeLocalCodeExecutor ── + + +def _make_skill_with_script(skill_name, script_name, script): + """Creates a minimal mock Skill with a single script.""" + skill = mock.create_autospec(models.Skill, instance=True) + skill.name = skill_name + skill.description = f"Test skill {skill_name}" + skill.instructions = "test instructions" + fm = mock.create_autospec(models.Frontmatter, instance=True) + fm.name = skill_name + fm.description = f"Test skill {skill_name}" + skill.frontmatter = fm + skill.resources = mock.MagicMock( + spec=[ + "get_reference", + "get_asset", + "get_script", + "list_references", + "list_assets", + "list_scripts", + ] + ) + + def get_script(name): + if name == script_name: + return script + return None + + skill.resources.get_script.side_effect = get_script + skill.resources.get_reference.return_value = None + skill.resources.get_asset.return_value = None + skill.resources.list_references.return_value = [] + skill.resources.list_assets.return_value = [] + skill.resources.list_scripts.return_value = [script_name] + return skill + + +def _make_real_executor_toolset(skills, **kwargs): + """Creates a SkillToolset with a real UnsafeLocalCodeExecutor.""" + from google.adk.code_executors.unsafe_local_code_executor import UnsafeLocalCodeExecutor + + executor = UnsafeLocalCodeExecutor() + return skill_toolset.SkillToolset(skills, code_executor=executor, **kwargs) + + +@pytest.mark.asyncio +async def test_integration_python_stdout(): + """Real executor: Python script stdout is captured.""" + script = models.Script(src="print('hello world')") + skill = _make_skill_with_script("test_skill", "hello.py", script) + toolset = _make_real_executor_toolset([skill]) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "test_skill", + "script_path": "hello.py", + }, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["stdout"] == "hello world\n" + assert result["stderr"] == "" + + +@pytest.mark.asyncio +async def test_integration_python_sys_exit_zero(): + """Real executor: sys.exit(0) is treated as success.""" + script = models.Script(src="import sys; sys.exit(0)") + skill = _make_skill_with_script("test_skill", "exit_zero.py", script) + toolset = _make_real_executor_toolset([skill]) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "test_skill", + "script_path": "exit_zero.py", + }, + tool_context=ctx, + ) + assert result["status"] == "success" + + +@pytest.mark.asyncio +async def test_integration_shell_stdout_and_stderr(): + """Real executor: shell script preserves both stdout and stderr.""" + script = models.Script(src="echo output; echo warning >&2") + skill = _make_skill_with_script("test_skill", "both.sh", script) + toolset = _make_real_executor_toolset([skill]) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "test_skill", + "script_path": "both.sh", + }, + tool_context=ctx, + ) + assert result["status"] == "warning" + assert "output" in result["stdout"] + assert "warning" in result["stderr"] + + +@pytest.mark.asyncio +async def test_integration_shell_stderr_only(): + """Real executor: shell script with only stderr reports error.""" + script = models.Script(src="echo failure >&2") + skill = _make_skill_with_script("test_skill", "err.sh", script) + toolset = _make_real_executor_toolset([skill]) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "test_skill", + "script_path": "err.sh", + }, + tool_context=ctx, + ) + assert result["status"] == "error" + assert "failure" in result["stderr"] + + +# ── Shell JSON envelope parsing (unit tests with mock executor) ── + + +@pytest.mark.asyncio +async def test_shell_json_envelope_parsed(mock_skill1): + """Shell JSON envelope is correctly unpacked by run_async.""" + import json + + envelope = json.dumps({ + "__shell_result__": True, + "stdout": "hello from shell\n", + "stderr": "", + "returncode": 0, + }) + executor = _make_mock_executor(stdout=envelope) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["stdout"] == "hello from shell\n" + assert result["stderr"] == "" + + +@pytest.mark.asyncio +async def test_shell_json_envelope_nonzero_returncode(mock_skill1): + """Non-zero returncode in shell envelope sets stderr.""" + import json + + envelope = json.dumps({ + "__shell_result__": True, + "stdout": "", + "stderr": "", + "returncode": 2, + }) + executor = _make_mock_executor(stdout=envelope) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "error" + assert "Exit code 2" in result["stderr"] + + +@pytest.mark.asyncio +async def test_shell_json_envelope_with_stderr(mock_skill1): + """Shell envelope with both stdout and stderr reports warning.""" + import json + + envelope = json.dumps({ + "__shell_result__": True, + "stdout": "data\n", + "stderr": "deprecation warning\n", + "returncode": 0, + }) + executor = _make_mock_executor(stdout=envelope) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "warning" + assert result["stdout"] == "data\n" + assert result["stderr"] == "deprecation warning\n" + + +@pytest.mark.asyncio +async def test_shell_json_envelope_timeout(mock_skill1): + """Shell envelope from TimeoutExpired reports error status.""" + import json + + envelope = json.dumps({ + "__shell_result__": True, + "stdout": "partial output\n", + "stderr": "Timed out after 300s", + "returncode": -1, + }) + executor = _make_mock_executor(stdout=envelope) + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "error" + assert result["stdout"] == "partial output\n" + assert "Timed out" in result["stderr"] + + +@pytest.mark.asyncio +async def test_shell_non_json_stdout_passthrough(mock_skill1): + """Non-JSON shell stdout is passed through without parsing.""" + executor = _make_mock_executor(stdout="plain text output\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={"skill_name": "skill1", "script_path": "setup.sh"}, + tool_context=ctx, + ) + assert result["status"] == "success" + assert result["stdout"] == "plain text output\n" + + +# ── input_files packaging ── + + +@pytest.mark.asyncio +async def test_execute_script_input_files_packaged(mock_skill1): + """Verify references, assets, and scripts are packaged inside the wrapper code.""" + executor = _make_mock_executor(stdout="ok\n") + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + await tool.run_async( + args={"skill_name": "skill1", "script_path": "run.py"}, + tool_context=ctx, + ) + + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + + # input_files is no longer populated; it's serialized inside the script + assert code_input.input_files is None or len(code_input.input_files) == 0 + + # Ensure the extracted literal contains our fake files + assert "references/ref1.md" in code_input.code + assert "assets/asset1.txt" in code_input.code + assert "scripts/setup.sh" in code_input.code + assert "scripts/run.py" in code_input.code + assert "scripts/build.rb" in code_input.code + + # Verify content mappings exist in the string + assert "'references/ref1.md': 'ref content 1'" in code_input.code + assert "'assets/asset1.txt': 'asset content 1'" in code_input.code + + +# ── Integration: shell non-zero exit ── + + +@pytest.mark.asyncio +async def test_integration_shell_nonzero_exit(): + """Real executor: shell script with non-zero exit via JSON envelope.""" + script = models.Script(src="exit 42") + skill = _make_skill_with_script("test_skill", "fail.sh", script) + toolset = _make_real_executor_toolset([skill]) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "test_skill", + "script_path": "fail.sh", + }, + tool_context=ctx, + ) + assert result["status"] == "error" + assert "42" in result["stderr"] + + +# ── Finding 1: system instruction references correct tool name ── + + +def test_system_instruction_references_run_skill_script(): + """System instruction must reference the actual tool name.""" + assert "run_skill_script" in skill_toolset.DEFAULT_SKILL_SYSTEM_INSTRUCTION + assert ( + "execute_skill_script" + not in skill_toolset.DEFAULT_SKILL_SYSTEM_INSTRUCTION + ) + + +# ── Finding 2: empty files are mounted (not silently dropped) ── + + +@pytest.mark.asyncio +async def test_execute_script_empty_files_mounted(): + """Verify empty files are included in wrapper code, not dropped.""" + skill = mock.create_autospec(models.Skill, instance=True) + skill.name = "skill_empty" + skill.resources = mock.MagicMock( + spec=[ + "get_reference", + "get_asset", + "get_script", + "list_references", + "list_assets", + "list_scripts", + ] + ) + skill.resources.get_reference.side_effect = ( + lambda n: "" if n == "empty.md" else None + ) + skill.resources.get_asset.side_effect = ( + lambda n: "" if n == "empty.cfg" else None + ) + skill.resources.get_script.side_effect = ( + lambda n: models.Script(src="") if n == "run.py" else None + ) + skill.resources.list_references.return_value = ["empty.md"] + skill.resources.list_assets.return_value = ["empty.cfg"] + skill.resources.list_scripts.return_value = ["run.py"] + + executor = _make_mock_executor(stdout="ok\n") + toolset = skill_toolset.SkillToolset([skill], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + await tool.run_async( + args={"skill_name": "skill_empty", "script_path": "run.py"}, + tool_context=ctx, + ) + + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + assert "'references/empty.md': ''" in code_input.code + assert "'assets/empty.cfg': ''" in code_input.code + assert "'scripts/run.py': ''" in code_input.code + + +# ── Finding 3: invalid args type returns clear error ── + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "bad_args", + [ + "not a dict", + ["a", "list"], + 42, + True, + ], +) +async def test_execute_script_invalid_args_type(mock_skill1, bad_args): + """Non-dict args should return INVALID_ARGS_TYPE, not crash.""" + executor = _make_mock_executor() + toolset = skill_toolset.SkillToolset([mock_skill1], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + result = await tool.run_async( + args={ + "skill_name": "skill1", + "script_path": "run.py", + "args": bad_args, + }, + tool_context=ctx, + ) + assert result["error_code"] == "INVALID_ARGS_TYPE" + executor.execute_code.assert_not_called() + + +# ── Finding 4: binary file content is handled in wrapper ── + + +@pytest.mark.asyncio +async def test_execute_script_binary_content_packaged(): + """Verify binary asset content uses 'wb' mode in wrapper code.""" + skill = mock.create_autospec(models.Skill, instance=True) + skill.name = "skill_bin" + skill.resources = mock.MagicMock( + spec=[ + "get_reference", + "get_asset", + "get_script", + "list_references", + "list_assets", + "list_scripts", + ] + ) + skill.resources.get_reference.side_effect = ( + lambda n: b"\x00\x01\x02" if n == "data.bin" else None + ) + skill.resources.get_asset.return_value = None + skill.resources.get_script.side_effect = lambda n: ( + models.Script(src="print('ok')") if n == "run.py" else None + ) + skill.resources.list_references.return_value = ["data.bin"] + skill.resources.list_assets.return_value = [] + skill.resources.list_scripts.return_value = ["run.py"] + + executor = _make_mock_executor(stdout="ok\n") + toolset = skill_toolset.SkillToolset([skill], code_executor=executor) + tool = skill_toolset.RunSkillScriptTool(toolset) + ctx = _make_tool_context_with_agent() + await tool.run_async( + args={"skill_name": "skill_bin", "script_path": "run.py"}, + tool_context=ctx, + ) + + call_args = executor.execute_code.call_args + code_input = call_args[0][1] + # Binary content should appear as bytes literal + assert "b'\\x00\\x01\\x02'" in code_input.code + # Wrapper code handles binary with 'wb' mode + assert "'wb' if isinstance(content, bytes)" in code_input.code