feat: Add GkeCodeExecutor for sandboxed code execution on GKE #non-breaking

Merge https://github.com/google/adk-python/pull/1629

close https://github.com/google/adk-python/issues/2170

### Summary

This PR introduces `GkeCodeExecutor`, a new code executor that provides a secure and scalable method for running LLM-generated code by leveraging GKE Sandbox. It serves as a robust alternative to local or standard containerized executors by leveraging the **GKE Sandbox** environment, which uses gVisor for workload isolation.

For each code execution request, it dynamically creates an ephemeral Kubernetes Job with a hardened Pod configuration, offering significant security benefits and ensuring that each code execution runs in a clean, isolated environment.

### Key Features of GkeCodeExecutor

* **Dynamic Job Creation**: Uses the Kubernetes `batch/v1` API to create a new Job for each code snippet.
* **Secure Code Mounting**: Injects code into the Pod via a temporary `ConfigMap`, which is mounted to a read-only file.
* **gVisor Sandboxing**: Enforces execution within a `gvisor` runtime for kernel-level isolation.
* **Hardened Security Context**: Pods run as non-root with all Linux capabilities dropped and a read-only root filesystem.
* **Resource Management**: Applies configurable CPU and memory limits to prevent abuse.
* **Automatic Cleanup**: Uses the `ttl_seconds_after_finished` feature on Jobs for robust, automatic garbage collection of completed Pods and Jobs.
* **Node Scheduling**: The executor uses Kubernetes `tolerations` in its Pod specification. This allows the k8s scheduler to place the execution Pod onto a **_pre-configured_** gVisor-enabled node.
* **Module Integration**: The `GkeCodeExecutor` is registered in the `code_executors/__init__.py`, making it available for use by agents. The `ImportError` handling is configured to check for the required `kubernetes` SDK.

### Execution Flow:

1.  Agent invokes `GkeCodeExecutor` with the LLM-generated code.
2.  The `GkeCodeExecutor` will `execute_code` – creates a temporary `ConfigMap`, and then create a k8s `Job` to run it.
3.  This Job runs a standard `python:3.11-slim` container. The image is pulled once to the node and cached. The Job will mount the ConfigMap as `/app/code.py`
4.  The GkeCodeExecutor will monitor the Job to completion, fetch `stdout/stderr` logs from the container, return `CodeExecutionResult` to the LlmAgent, and ensure all temp resources are deleted.
5.  The calling agent formats the result and provides a final response to the user. If the result contains error, it will retry up to `error_retry_attempts` times.

PiperOrigin-RevId: 804511467
This commit is contained in:
Google Team Member
2025-09-08 11:14:09 -07:00
committed by Copybara-Service
parent e63fe0c0eb
commit 72ff9c64a2
6 changed files with 691 additions and 0 deletions
@@ -0,0 +1,49 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A Python coding agent using the GkeCodeExecutor for secure execution."""
from google.adk.agents import LlmAgent
from google.adk.code_executors import GkeCodeExecutor
def gke_agent_system_instruction():
"""Returns: The system instruction for the GKE-based coding agent."""
return """You are a helpful and capable AI agent that can write and execute Python code to answer questions and perform tasks.
When a user asks a question, follow these steps:
1. Analyze the request.
2. Write a complete, self-contained Python script to accomplish the task.
3. Your code will be executed in a secure, sandboxed environment.
4. Return the full and complete output from the code execution, including any text, results, or error messages."""
gke_executor = GkeCodeExecutor(
# This must match the namespace in your deployment_rbac.yaml where the
# agent's ServiceAccount and Role have permissions.
namespace="agent-sandbox",
# Setting an explicit timeout prevents a stuck job from running forever.
timeout_seconds=600,
)
root_agent = LlmAgent(
name="gke_coding_agent",
model="gemini-2.0-flash",
description=(
"A general-purpose agent that executes Python code in a secure GKE"
" Sandbox."
),
instruction=gke_agent_system_instruction(),
code_executor=gke_executor,
)
@@ -0,0 +1,50 @@
apiVersion: v1
kind: Namespace
metadata:
name: agent-sandbox
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: adk-agent-sa
namespace: agent-sandbox
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: adk-agent-role
namespace: agent-sandbox
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
# create: Needed for _batch_v1.create_namespaced_job().
# watch: Needed for watch.stream(self._batch_v1.list_namespaced_job, ...) to wait for completion
# list/get: Required for the watch to initialize and to get job details.
verbs: ["create", "get", "watch", "list", "delete"]
- apiGroups: [""]
resources: ["configmaps"]
# create: Needed mount the agent's code into the Job's Pod.
# delete: Needed for cleanup in the finally block
verbs: ["create", "get", "list", "delete"]
- apiGroups: [""]
resources: ["pods"]
# list: Needed to find the correct Pod _core_v1.list_namespaced_pod(label_selector=...)
verbs: ["get", "list", "delete"]
- apiGroups: [""]
# get: Needed for _core_v1.read_namespaced_pod_log() to get the code execution results and logs.
resources: ["pods/log"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: adk-agent-binding
namespace: agent-sandbox
subjects:
- kind: ServiceAccount
name: adk-agent-sa
namespace: agent-sandbox
roleRef:
kind: Role
name: adk-agent-role
apiGroup: rbac.authorization.k8s.io
+2
View File
@@ -110,6 +110,7 @@ test = [
"langchain-community>=0.3.17",
"langgraph>=0.2.60, <= 0.4.10", # For LangGraphAgent
"litellm>=1.75.5, <2.0.0", # For LiteLLM tests
"kubernetes>=29.0.0", # For GkeCodeExecutor
"llama-index-readers-file>=0.4.0", # For retrieval tests
"openai>=1.100.2", # For LiteLLM
"pytest-asyncio>=0.25.0",
@@ -139,6 +140,7 @@ extensions = [
"docker>=7.0.0", # For ContainerCodeExecutor
"langgraph>=0.2.60", # For LangGraphAgent
"litellm>=1.75.5", # For LiteLlm class. Currently has OpenAI limitations. TODO: once LiteLlm fix it
"kubernetes>=29.0.0", # For GkeCodeExecutor
"llama-index-readers-file>=0.4.0", # For retrieval using LlamaIndex.
"llama-index-embeddings-google-genai>=0.3.0",# For files retrieval using LlamaIndex.
"lxml>=5.3.0", # For load_web_page tool.
+11
View File
@@ -28,6 +28,7 @@ __all__ = [
'UnsafeLocalCodeExecutor',
'VertexAiCodeExecutor',
'ContainerCodeExecutor',
'GkeCodeExecutor',
]
@@ -52,4 +53,14 @@ def __getattr__(name: str):
'ContainerCodeExecutor requires additional dependencies. '
'Please install with: pip install "google-adk[extensions]"'
) from e
elif name == 'GkeCodeExecutor':
try:
from .gke_code_executor import GkeCodeExecutor
return GkeCodeExecutor
except ImportError as e:
raise ImportError(
'GkeCodeExecutor requires additional dependencies. '
'Please install with: pip install "google-adk[extensions]"'
) from e
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
@@ -0,0 +1,352 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import uuid
import kubernetes as k8s
from kubernetes.watch import Watch
from ..agents.invocation_context import InvocationContext
from .base_code_executor import BaseCodeExecutor
from .code_execution_utils import CodeExecutionInput
from .code_execution_utils import CodeExecutionResult
# Expose these for tests to monkeypatch.
client = k8s.client
config = k8s.config
ApiException = k8s.client.exceptions.ApiException
logger = logging.getLogger("google_adk." + __name__)
class GkeCodeExecutor(BaseCodeExecutor):
"""Executes Python code in a secure gVisor-sandboxed Pod on GKE.
This executor securely runs code by dynamically creating a Kubernetes Job for
each execution request. The user's code is mounted via a ConfigMap, and the
Pod is hardened with a strict security context and resource limits.
Key Features:
- Sandboxed execution using the gVisor runtime.
- Ephemeral, per-execution environments using Kubernetes Jobs.
- Secure-by-default Pod configuration (non-root, no privileges).
- Automatic garbage collection of completed Jobs and Pods via TTL.
- Efficient, event-driven waiting using the Kubernetes watch API.
RBAC Permissions:
This executor requires a ServiceAccount with specific RBAC permissions. The
Role granted to the ServiceAccount must include rules to manage Jobs,
ConfigMaps, and Pod logs. Below is a minimal set of required permissions:
rules:
# For creating/deleting code ConfigMaps and patching ownerReferences
- apiGroups: [""] # Core API Group
resources: ["configmaps"]
verbs: ["create", "delete", "get", "patch"]
# For watching Job completion status
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "delete"]
# For retrieving logs from the completed Job's Pod
- apiGroups: [""] # Core API Group
resources: ["pods", "pods/log"]
verbs: ["get", "list"]
"""
namespace: str = "default"
image: str = "python:3.11-slim"
timeout_seconds: int = 300
cpu_requested: str = "200m"
mem_requested: str = "256Mi"
# The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core.
cpu_limit: str = "500m"
mem_limit: str = "512Mi"
kubeconfig_path: str | None = None
kubeconfig_context: str | None = None
_batch_v1: k8s.client.BatchV1Api
_core_v1: k8s.client.CoreV1Api
def __init__(
self,
kubeconfig_path: str | None = None,
kubeconfig_context: str | None = None,
**data,
):
"""Initializes the executor and the Kubernetes API clients.
This constructor supports multiple authentication methods:
1. Explicitly via a kubeconfig file path and context.
2. Automatically via in-cluster service account (when running in GKE).
3. Automatically via the default local kubeconfig file (~/.kube/config).
"""
super().__init__(**data)
self.kubeconfig_path = kubeconfig_path
self.kubeconfig_context = kubeconfig_context
if self.kubeconfig_path:
try:
logger.info(f"Using explicit kubeconfig from '{self.kubeconfig_path}'.")
config.load_kube_config(
config_file=self.kubeconfig_path, context=self.kubeconfig_context
)
except config.ConfigException as e:
logger.error(
f"Failed to load explicit kubeconfig from {self.kubeconfig_path}",
exc_info=True,
)
raise RuntimeError(
"Failed to configure Kubernetes client from provided path."
) from e
else:
try:
config.load_incluster_config()
logger.info("Using in-cluster Kubernetes configuration.")
except config.ConfigException:
try:
logger.info(
"In-cluster config not found. Falling back to default local"
" kubeconfig."
)
config.load_kube_config()
except config.ConfigException as e:
logger.error(
"Could not configure Kubernetes client automatically.",
exc_info=True,
)
raise RuntimeError(
"Failed to find any valid Kubernetes configuration."
) from e
self._batch_v1 = client.BatchV1Api()
self._core_v1 = client.CoreV1Api()
def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:
"""Orchestrates the secure execution of a code snippet on GKE."""
job_name = f"adk-exec-{uuid.uuid4().hex[:10]}"
configmap_name = f"code-src-{job_name}"
try:
# The execution process:
# 1. Create a ConfigMap to mount LLM-generated code into the Pod.
# 2. Create a Job that runs the code from the ConfigMap.
# 3. Set the Job as the ConfigMap's owner for automatic cleanup.
self._create_code_configmap(configmap_name, code_execution_input.code)
job_manifest = self._create_job_manifest(
job_name, configmap_name, invocation_context
)
created_job = self._batch_v1.create_namespaced_job(
body=job_manifest, namespace=self.namespace
)
self._add_owner_reference(created_job, configmap_name)
logger.info(
f"Submitted Job '{job_name}' to namespace '{self.namespace}'."
)
return self._watch_job_completion(job_name)
except ApiException as e:
logger.error(
"A Kubernetes API error occurred during job"
f" '{job_name}': {e.reason}",
exc_info=True,
)
return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}")
except TimeoutError as e:
logger.error(e, exc_info=True)
logs = self._get_pod_logs(job_name)
stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}"
return CodeExecutionResult(stderr=stderr)
except Exception as e:
logger.error(
f"An unexpected error occurred during job '{job_name}': {e}",
exc_info=True,
)
return CodeExecutionResult(
stderr=f"An unexpected executor error occurred: {e}"
)
def _create_job_manifest(
self,
job_name: str,
configmap_name: str,
invocation_context: InvocationContext,
) -> k8s.client.V1Job:
"""Creates the complete V1Job object with security best practices."""
# Define the container that will run the code.
container = k8s.client.V1Container(
name="code-runner",
image=self.image,
command=["python3", "/app/code.py"],
volume_mounts=[
k8s.client.V1VolumeMount(name="code-volume", mount_path="/app")
],
# Enforce a strict security context.
security_context=k8s.client.V1SecurityContext(
run_as_non_root=True,
run_as_user=1001,
allow_privilege_escalation=False,
read_only_root_filesystem=True,
capabilities=k8s.client.V1Capabilities(drop=["ALL"]),
),
# Set resource limits to prevent abuse.
resources=k8s.client.V1ResourceRequirements(
requests={"cpu": self.cpu_requested, "memory": self.mem_requested},
limits={"cpu": self.cpu_limit, "memory": self.mem_limit},
),
)
# Use tolerations to request a gVisor node.
pod_spec = k8s.client.V1PodSpec(
restart_policy="Never",
containers=[container],
volumes=[
k8s.client.V1Volume(
name="code-volume",
config_map=k8s.client.V1ConfigMapVolumeSource(
name=configmap_name
),
)
],
runtime_class_name="gvisor", # Request the gVisor runtime.
tolerations=[
k8s.client.V1Toleration(
key="sandbox.gke.io/runtime",
operator="Equal",
value="gvisor",
effect="NoSchedule",
)
],
)
job_spec = k8s.client.V1JobSpec(
template=k8s.client.V1PodTemplateSpec(spec=pod_spec),
backoff_limit=0, # Do not retry the Job on failure.
# Kubernetes TTL controller will handle Job/Pod cleanup.
ttl_seconds_after_finished=600, # Garbage collect after 10 minutes.
)
# Assemble and return the final Job object.
annotations = {
"adk.agent.google.com/invocation-id": invocation_context.invocation_id
}
return k8s.client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=k8s.client.V1ObjectMeta(
name=job_name, annotations=annotations
),
spec=job_spec,
)
def _watch_job_completion(self, job_name: str) -> CodeExecutionResult:
"""Uses the watch API to efficiently wait for job completion."""
watch = Watch()
try:
for event in watch.stream(
self._batch_v1.list_namespaced_job,
namespace=self.namespace,
field_selector=f"metadata.name={job_name}",
timeout_seconds=self.timeout_seconds,
):
job = event["object"]
if job.status.succeeded:
watch.stop()
logger.info(f"Job '{job_name}' succeeded.")
logs = self._get_pod_logs(job_name)
return CodeExecutionResult(stdout=logs)
if job.status.failed:
watch.stop()
logger.error(f"Job '{job_name}' failed.")
logs = self._get_pod_logs(job_name)
return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}")
# If the loop finishes without returning, the watch timed out.
raise TimeoutError(
f"Job '{job_name}' did not complete within {self.timeout_seconds}s."
)
finally:
watch.stop()
def _get_pod_logs(self, job_name: str) -> str:
"""Retrieves logs from the pod created by the specified job.
Raises:
RuntimeError: If the pod cannot be found or logs cannot be fetched.
"""
try:
pods = self._core_v1.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"job-name={job_name}",
limit=1,
)
if not pods.items:
raise RuntimeError(
f"Could not find Pod for Job '{job_name}' to retrieve logs."
)
pod_name = pods.items[0].metadata.name
return self._core_v1.read_namespaced_pod_log(
name=pod_name, namespace=self.namespace
)
except ApiException as e:
raise RuntimeError(
f"API error retrieving logs for job '{job_name}': {e.reason}"
) from e
def _create_code_configmap(self, name: str, code: str) -> None:
"""Creates a ConfigMap to hold the Python code."""
body = k8s.client.V1ConfigMap(
metadata=k8s.client.V1ObjectMeta(name=name), data={"code.py": code}
)
self._core_v1.create_namespaced_config_map(
namespace=self.namespace, body=body
)
def _add_owner_reference(
self, owner_job: k8s.client.V1Job, configmap_name: str
) -> None:
"""Patches the ConfigMap to be owned by the Job for auto-cleanup."""
owner_reference = k8s.client.V1OwnerReference(
api_version=owner_job.api_version,
kind=owner_job.kind,
name=owner_job.metadata.name,
uid=owner_job.metadata.uid,
controller=True,
)
patch_body = {"metadata": {"ownerReferences": [owner_reference.to_dict()]}}
try:
self._core_v1.patch_namespaced_config_map(
name=configmap_name,
namespace=self.namespace,
body=patch_body,
)
logger.info(
f"Set Job '{owner_job.metadata.name}' as owner of ConfigMap"
f" '{configmap_name}'."
)
except ApiException as e:
logger.warning(
f"Failed to set ownerReference on ConfigMap '{configmap_name}'. "
f"Manual cleanup is required. Reason: {e.reason}"
)
@@ -0,0 +1,227 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import MagicMock
from unittest.mock import patch
from google.adk.agents.invocation_context import InvocationContext
from google.adk.code_executors.code_execution_utils import CodeExecutionInput
from google.adk.code_executors.gke_code_executor import GkeCodeExecutor
from kubernetes import client
from kubernetes import config
from kubernetes.client.rest import ApiException
import pytest
@pytest.fixture
def mock_invocation_context() -> InvocationContext:
"""Fixture for a mock InvocationContext."""
mock = MagicMock(spec=InvocationContext)
mock.invocation_id = "test-invocation-123"
return mock
@pytest.fixture(autouse=True)
def mock_k8s_config():
"""Fixture for auto-mocking Kubernetes config loading."""
with patch(
"google.adk.code_executors.gke_code_executor.config"
) as mock_config:
# Simulate fallback from in-cluster to kubeconfig
mock_config.ConfigException = config.ConfigException
mock_config.load_incluster_config.side_effect = config.ConfigException
yield mock_config
@pytest.fixture
def mock_k8s_clients():
"""Fixture for mock Kubernetes API clients."""
with patch(
"google.adk.code_executors.gke_code_executor.client"
) as mock_client_class:
mock_batch_v1 = MagicMock(spec=client.BatchV1Api)
mock_core_v1 = MagicMock(spec=client.CoreV1Api)
mock_client_class.BatchV1Api.return_value = mock_batch_v1
mock_client_class.CoreV1Api.return_value = mock_core_v1
yield {
"batch_v1": mock_batch_v1,
"core_v1": mock_core_v1,
}
class TestGkeCodeExecutor:
"""Unit tests for the GkeCodeExecutor."""
def test_init_defaults(self):
"""Tests that the executor initializes with correct default values."""
executor = GkeCodeExecutor()
assert executor.namespace == "default"
assert executor.image == "python:3.11-slim"
assert executor.timeout_seconds == 300
assert executor.cpu_requested == "200m"
assert executor.mem_limit == "512Mi"
def test_init_with_overrides(self):
"""Tests that class attributes can be overridden at instantiation."""
executor = GkeCodeExecutor(
namespace="test-ns",
image="custom-python:latest",
timeout_seconds=60,
cpu_limit="1000m",
)
assert executor.namespace == "test-ns"
assert executor.image == "custom-python:latest"
assert executor.timeout_seconds == 60
assert executor.cpu_limit == "1000m"
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_success(
self,
mock_watch,
mock_k8s_clients,
mock_invocation_context,
):
"""Tests the happy path for successful code execution."""
# Setup Mocks
mock_job = MagicMock()
mock_job.status.succeeded = True
mock_job.status.failed = None
mock_watch.return_value.stream.return_value = [{"object": mock_job}]
mock_pod_list = MagicMock()
mock_pod_list.items = [MagicMock()]
mock_pod_list.items[0].metadata.name = "test-pod-name"
mock_k8s_clients["core_v1"].list_namespaced_pod.return_value = mock_pod_list
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"hello world"
)
# Execute
executor = GkeCodeExecutor()
code_input = CodeExecutionInput(code='print("hello world")')
result = executor.execute_code(mock_invocation_context, code_input)
# Assert
assert result.stdout == "hello world"
assert result.stderr == ""
mock_k8s_clients[
"core_v1"
].create_namespaced_config_map.assert_called_once()
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once()
mock_k8s_clients["core_v1"].patch_namespaced_config_map.assert_called_once()
mock_k8s_clients["core_v1"].read_namespaced_pod_log.assert_called_once()
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_job_failed(
self,
mock_watch,
mock_k8s_clients,
mock_invocation_context,
):
"""Tests the path where the Kubernetes Job fails."""
mock_job = MagicMock()
mock_job.status.succeeded = None
mock_job.status.failed = True
mock_watch.return_value.stream.return_value = [{"object": mock_job}]
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"Traceback...\nValueError: failure"
)
executor = GkeCodeExecutor()
result = executor.execute_code(
mock_invocation_context, CodeExecutionInput(code="fail")
)
assert result.stdout == ""
assert "Job failed. Logs:" in result.stderr
assert "ValueError: failure" in result.stderr
def test_execute_code_api_exception(
self, mock_k8s_clients, mock_invocation_context
):
"""Tests handling of an ApiException from the K8s client."""
mock_k8s_clients["core_v1"].create_namespaced_config_map.side_effect = (
ApiException(reason="Test API Error")
)
executor = GkeCodeExecutor()
result = executor.execute_code(
mock_invocation_context, CodeExecutionInput(code="...")
)
assert result.stdout == ""
assert "Kubernetes API error: Test API Error" in result.stderr
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_timeout(
self,
mock_watch,
mock_k8s_clients,
mock_invocation_context,
):
"""Tests the case where the job watch times out."""
mock_watch.return_value.stream.return_value = (
[]
) # Empty stream simulates timeout
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"Still running..."
)
executor = GkeCodeExecutor(timeout_seconds=1)
result = executor.execute_code(
mock_invocation_context, CodeExecutionInput(code="...")
)
assert result.stdout == ""
assert "Executor timed out" in result.stderr
assert "did not complete within 1s" in result.stderr
assert "Pod Logs:\nStill running..." in result.stderr
def test_create_job_manifest_structure(self, mock_invocation_context):
"""Tests the correctness of the generated Job manifest."""
executor = GkeCodeExecutor(namespace="test-ns", image="test-img:v1")
job = executor._create_job_manifest(
"test-job", "test-cm", mock_invocation_context
)
# Check top-level properties
assert isinstance(job, client.V1Job)
assert job.api_version == "batch/v1"
assert job.kind == "Job"
assert job.metadata.name == "test-job"
assert job.spec.backoff_limit == 0
assert job.spec.ttl_seconds_after_finished == 600
# Check pod template properties
pod_spec = job.spec.template.spec
assert pod_spec.restart_policy == "Never"
assert pod_spec.runtime_class_name == "gvisor"
assert len(pod_spec.tolerations) == 1
assert pod_spec.tolerations[0].value == "gvisor"
assert len(pod_spec.volumes) == 1
assert pod_spec.volumes[0].name == "code-volume"
assert pod_spec.volumes[0].config_map.name == "test-cm"
# Check container properties
container = pod_spec.containers[0]
assert container.name == "code-runner"
assert container.image == "test-img:v1"
assert container.command == ["python3", "/app/code.py"]
# Check security context
sec_context = container.security_context
assert sec_context.run_as_non_root is True
assert sec_context.run_as_user == 1001
assert sec_context.allow_privilege_escalation is False
assert sec_context.read_only_root_filesystem is True
assert sec_context.capabilities.drop == ["ALL"]