chore: create an initial ADK release analyzer agent to find the doc updates needed between releases

PiperOrigin-RevId: 805030050
This commit is contained in:
Xuan Yang
2025-09-09 12:59:32 -07:00
committed by Copybara-Service
parent 8174a29c6d
commit e3422c616d
5 changed files with 691 additions and 0 deletions
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@@ -0,0 +1,121 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from adk_release_analyzer.settings import CODE_OWNER
from adk_release_analyzer.settings import CODE_REPO
from adk_release_analyzer.settings import DOC_OWNER
from adk_release_analyzer.settings import DOC_REPO
from adk_release_analyzer.settings import IS_INTERACTIVE
from adk_release_analyzer.settings import LOCAL_REPOS_DIR_PATH
from adk_release_analyzer.tools import clone_or_pull_repo
from adk_release_analyzer.tools import create_issue
from adk_release_analyzer.tools import get_changed_files_between_releases
from adk_release_analyzer.tools import list_directory_contents
from adk_release_analyzer.tools import list_releases
from adk_release_analyzer.tools import read_local_git_repo_file_content
from adk_release_analyzer.tools import search_local_git_repo
from google.adk import Agent
if IS_INTERACTIVE:
APPROVAL_INSTRUCTION = (
"Ask for user approval or confirmation for creating or updating the"
" issue."
)
else:
APPROVAL_INSTRUCTION = (
"**Do not** wait or ask for user approval or confirmation for creating or"
" updating the issue."
)
root_agent = Agent(
model="gemini-2.5-pro",
name="adk_release_analyzer",
description=(
"Analyze the changes between two ADK releases and generate instructions"
" about how to update the ADK docs."
),
instruction=f"""
# 1. Identity
You are a helper bot that checks if ADK docs in Github Repository {DOC_REPO} owned by {DOC_OWNER}
should be updated based on the changes in the ADK Python codebase in Github Repository {CODE_REPO} owned by {CODE_OWNER}.
You are very familiar with Github, expecially how to search for files in a Github repository using git grep.
# 2. Responsibilities
Your core responsibility includes:
- Find all the code changes between the two ADK releases.
- Find **all** the related docs files in ADK Docs repository under the "/docs/" directory.
- Compare the code changes with the docs files and analyze the differences.
- Write the instructions about how to update the ADK docs in markdown format and create a Github issue in the Github Repository {DOC_REPO} with the instructions.
# 3. Workflow
1. Always call the `clone_or_pull_repo` tool to make sure the ADK docs and codebase repos exist in the local folder {LOCAL_REPOS_DIR_PATH}/repo_name and are the latest version.
2. Find the code changes between the two ADK releases.
- You should call the `get_changed_files_between_releases` tool to find all the code changes between the two ADK releases.
- You can call the `list_releases` tool to find the release tags.
3. Understand the code changes between the two ADK releases.
- You should focus on the main ADK Python codebase, ignore the changes in tests or other auxiliary files.
4. Come up with a list of regex search patterns to search for related docs files.
5. Use the `search_local_git_repo` tool to search for related docs files using the regex patterns.
- You should look into all the related docs files, not only the most relevant one.
- Prefer searching from the root directory of the ADK Docs repository (i.e. /docs/), unless you are certain that the file is in a specific directory.
6. Read the found docs files using the `read_local_git_repo_file_content` tool to find all the docs to update.
- You should read all the found docs files and check if they are up to date.
7. Compare the code changes and docs files, and analyze the differences.
- You should not only check the code snippets in the docs, but also the text contents.
8. Write the instructions about how to update the ADK docs in a markdown format.
- For **each** recommended change, reference the code changes.
- For **each** recommended change, follow the format of the following template:
```
1. **Highlighted summary of the change**.
Details of the change.
**Current state**:
Current content in the doc
**Proposed Change**:
Proposed change to the doc.
**Reasoning**:
Explanation of why this change is necessary.
**Reference**:
Reference to the code change.
```
- When referncing doc file, use the full relative path of the doc file in the ADK Docs repository (e.g. docs/sessions/memory.md).
9. Create or recommend to create a Github issue in the Github Repository {DOC_REPO} with the instructions using the `create_issue` tool.
- The title of the issue should be "Found docs updates needed from ADK python release <start_tag> to <end_tag>", where start_tag and end_tag are the release tags.
- The body of the issue should be the instructions about how to update the ADK docs.
- **{APPROVAL_INSTRUCTION}**
# 4. Guidelines & Rules
- **File Paths:** Always use absolute paths when calling the tools to read files, list directories, or search the codebase.
- **Tool Call Parallelism:** Execute multiple independent tool calls in parallel when feasible (i.e. searching the codebase).
- **Explaination:** Provide concise explanations for your actions and reasoning for each step.
# 5. Output
Present the followings in an easy to read format as the final output to the user.
- The actions you took and the reasoning
- The summary of the differences found
""",
tools=[
list_releases,
get_changed_files_between_releases,
clone_or_pull_repo,
list_directory_contents,
search_local_git_repo,
read_local_git_repo_file_content,
create_issue,
],
)
@@ -0,0 +1,33 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from dotenv import load_dotenv
load_dotenv(override=True)
GITHUB_BASE_URL = "https://api.github.com"
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if not GITHUB_TOKEN:
raise ValueError("GITHUB_TOKEN environment variable not set")
DOC_OWNER = os.getenv("DOC_OWNER", "google")
CODE_OWNER = os.getenv("CODE_OWNER", "google")
DOC_REPO = os.getenv("DOC_REPO", "adk-docs")
CODE_REPO = os.getenv("CODE_REPO", "adk-python")
LOCAL_REPOS_DIR_PATH = os.getenv("LOCAL_REPOS_DIR_PATH", "/tmp")
IS_INTERACTIVE = os.getenv("INTERACTIVE", "1").lower() in ["true", "1"]
@@ -0,0 +1,452 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from adk_release_analyzer.settings import GITHUB_BASE_URL
from adk_release_analyzer.utils import error_response
from adk_release_analyzer.utils import get_paginated_request
from adk_release_analyzer.utils import get_request
from adk_release_analyzer.utils import patch_request
from adk_release_analyzer.utils import post_request
import requests
def list_releases(repo_owner: str, repo_name: str) -> Dict[str, Any]:
"""Lists all releases for a repository.
This function retrieves all releases and for each one, returns its ID,
creation time, publication time, and associated tag name. It handles
pagination to ensure all releases are fetched.
Args:
repo_owner: The name of the repository owner.
repo_name: The name of the repository.
Returns:
A dictionary containing the status and a list of releases.
"""
# The initial URL for the releases endpoint
# per_page=100 is used to reduce the number of API calls
url = (
f"{GITHUB_BASE_URL}/repos/{repo_owner}/{repo_name}/releases?per_page=100"
)
try:
all_releases_data = get_paginated_request(url)
# Format the response to include only the requested fields
formatted_releases = []
for release in all_releases_data:
formatted_releases.append({
"id": release.get("id"),
"tag_name": release.get("tag_name"),
"created_at": release.get("created_at"),
"published_at": release.get("published_at"),
})
return {"status": "success", "releases": formatted_releases}
except requests.exceptions.HTTPError as e:
return error_response(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
return error_response(f"Request Error: {e}")
def get_changed_files_between_releases(
repo_owner: str, repo_name: str, start_tag: str, end_tag: str
) -> Dict[str, Any]:
"""Gets changed files and their modifications between two release tags.
Args:
repo_owner: The name of the repository owner.
repo_name: The name of the repository.
start_tag: The older tag (base) for the comparison.
end_tag: The newer tag (head) for the comparison.
Returns:
A dictionary containing the status and a list of changed files.
Each file includes its name, status (added, removed, modified),
and the patch/diff content.
"""
# The 'basehead' parameter is specified as 'base...head'.
url = f"{GITHUB_BASE_URL}/repos/{repo_owner}/{repo_name}/compare/{start_tag}...{end_tag}"
try:
comparison_data = get_request(url)
# The API returns a 'files' key with the list of changed files.
changed_files = comparison_data.get("files", [])
# Extract just the information we need for a cleaner output
formatted_files = []
for file_data in changed_files:
formatted_files.append({
"relative_path": file_data.get("filename"),
"status": file_data.get("status"),
"additions": file_data.get("additions"),
"deletions": file_data.get("deletions"),
"changes": file_data.get("changes"),
"patch": file_data.get(
"patch", "No patch available."
), # The diff content
})
return {"status": "success", "changed_files": formatted_files}
except requests.exceptions.HTTPError as e:
return error_response(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
return error_response(f"Request Error: {e}")
def clone_or_pull_repo(
repo_owner: str,
repo_name: str,
local_path: str,
) -> Dict[str, Any]:
"""Clones a GitHub repository to a local folder using owner and repo name.
If the folder already exists and is a valid Git repository, it pulls the
latest changes instead.
Args:
repo_owner: The username or organization that owns the repository.
repo_name: The name of the repository.
local_path: The local directory path where the repository should be cloned
or updated.
Returns:
A dictionary indicating the status of the operation, output message, and
the head commit hash.
"""
repo_url = f"https://github.com/{repo_owner}/{repo_name}.git"
try:
# Check local path and decide to clone or pull
if os.path.exists(local_path):
git_dir_path = os.path.join(local_path, ".git")
if os.path.isdir(git_dir_path):
print(f"Repository exists at '{local_path}'. Pulling latest changes...")
try:
output = _get_pull(local_path)
except subprocess.CalledProcessError as e:
return error_response(f"git pull failed: {e.stderr}")
else:
return error_response(
f"Path '{local_path}' exists but is not a Git repository."
)
else:
print(f"Cloning from {repo_owner}/{repo_name} into '{local_path}'...")
try:
output = _get_clone(repo_url, local_path)
except subprocess.CalledProcessError as e:
return error_response(f"git clone failed: {e.stderr}")
head_commit_sha = _find_head_commit_sha(local_path)
except FileNotFoundError:
return error_response("Error: 'git' command not found. Is Git installed?")
except subprocess.TimeoutExpired as e:
return error_response(f"Command timeout: {e}")
except (subprocess.CalledProcessError, OSError, ValueError) as e:
return error_response(f"An unexpected error occurred: {e}")
return {
"status": "success",
"output": output,
"head_commit_sha": head_commit_sha,
}
def read_local_git_repo_file_content(file_path: str) -> Dict[str, Any]:
"""Reads the content of a specified file in a local Git repository.
Args:
file_path: The full, absolute path to the file.
Returns:
A dictionary containing the status, content of the file, and the head
commit hash.
"""
print(f"Attempting to read file from path: {file_path}")
dir_path = os.path.dirname(file_path)
head_commit_sha = _find_head_commit_sha(dir_path)
try:
# Open and read the file content
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Add line numbers to the content
lines = content.splitlines()
numbered_lines = [f"{i + 1}: {line}" for i, line in enumerate(lines)]
numbered_content = "\n".join(numbered_lines)
return {
"status": "success",
"file_path": file_path,
"content": numbered_content,
"head_commit_sha": head_commit_sha,
}
except FileNotFoundError:
return error_response(f"Error: File not found at {file_path}")
except IOError as e:
return error_response(f"An unexpected error occurred: {e}")
def list_directory_contents(directory_path: str) -> Dict[str, Any]:
"""Recursively lists all files and directories within a specified directory.
Args:
directory_path: The full, absolute path to the directory.
Returns:
A dictionary containing the status and a map where keys are directory
paths relative to the initial directory_path, and values are lists of
their contents.
Returns an error message if the directory cannot be accessed.
"""
print(
f"Attempting to recursively list contents of directory: {directory_path}"
)
if not os.path.isdir(directory_path):
return error_response(f"Error: Directory not found at {directory_path}")
directory_map = {}
try:
for root, dirs, files in os.walk(directory_path):
# Filter out hidden directories from traversal and from the result
dirs[:] = [d for d in dirs if not d.startswith(".")]
# Filter out hidden files
non_hidden_files = [f for f in files if not f.startswith(".")]
relative_path = os.path.relpath(root, directory_path)
directory_map[relative_path] = dirs + non_hidden_files
return {
"status": "success",
"directory_path": directory_path,
"directory_map": directory_map,
}
except (IOError, OSError) as e:
return error_response(f"An unexpected error occurred: {e}")
def search_local_git_repo(
directory_path: str,
pattern: str,
extensions: Optional[List[str]],
ignored_dirs: Optional[List[str]],
) -> Dict[str, Any]:
"""Searches a local Git repository for a pattern.
Args:
directory_path: The absolute path to the local Git repository.
pattern: The search pattern (can be a simple string or regex for git
grep).
extensions: The list of file extensions to search, e.g. ["py", "md"]. If
None, all extensions will be searched.
ignored_dirs: The list of directories to ignore, e.g. ["tests"]. If None,
no directories will be ignored.
Returns:
A dictionary containing the status, and a list of match details (relative
file path to the directory_path, line number, content).
"""
print(
f"Attempting to search for pattern: {pattern} in directory:"
f" {directory_path}, with extensions: {extensions}"
)
try:
grep_process = _git_grep(directory_path, pattern, extensions, ignored_dirs)
if grep_process.returncode > 1:
return error_response(f"git grep failed: {grep_process.stderr}")
matches = []
if grep_process.stdout:
for line in grep_process.stdout.strip().split("\n"):
try:
file_path, line_number_str, line_content = line.split(":", 2)
matches.append({
"file_path": file_path,
"line_number": int(line_number_str),
"line_content": line_content.strip(),
})
except ValueError:
return error_response(
f"Error: Failed to parse line: {line} from git grep output."
)
return {
"status": "success",
"matches": matches,
}
except FileNotFoundError:
return error_response(f"Directory not found: {directory_path}")
except subprocess.CalledProcessError as e:
return error_response(f"git grep failed: {e.stderr}")
except (IOError, OSError, ValueError) as e:
return error_response(f"An unexpected error occurred: {e}")
def get_issue(
repo_owner: str, repo_name: str, issue_number: int
) -> Dict[str, Any]:
"""Get the details of the specified issue number.
Args:
repo_owner: The name of the repository owner.
repo_name: The name of the repository.
issue_number: issue number of the Github issue.
Returns:
The status of this request, with the issue details when successful.
"""
url = (
f"{GITHUB_BASE_URL}/repos/{repo_owner}/{repo_name}/issues/{issue_number}"
)
try:
response = get_request(url)
except requests.exceptions.RequestException as e:
return error_response(f"Error: {e}")
return {"status": "success", "issue": response}
def create_issue(
repo_owner: str,
repo_name: str,
title: str,
body: str,
) -> Dict[str, Any]:
"""Create a new issue in the specified repository.
Args:
repo_owner: The name of the repository owner.
repo_name: The name of the repository.
title: The title of the issue.
body: The body of the issue.
Returns:
The status of this request, with the issue details when successful.
"""
url = f"{GITHUB_BASE_URL}/repos/{repo_owner}/{repo_name}/issues"
payload = {"title": title, "body": body, "labels": ["docs updates"]}
try:
response = post_request(url, payload)
except requests.exceptions.RequestException as e:
return error_response(f"Error: {e}")
return {"status": "success", "issue": response}
def update_issue(
repo_owner: str,
repo_name: str,
issue_number: int,
title: str,
body: str,
) -> Dict[str, Any]:
"""Update an existing issue in the specified repository.
Args:
repo_owner: The name of the repository owner.
repo_name: The name of the repository.
issue_number: The number of the issue to update.
title: The title of the issue.
body: The body of the issue.
Returns:
The status of this request, with the issue details when successful.
"""
url = (
f"{GITHUB_BASE_URL}/repos/{repo_owner}/{repo_name}/issues/{issue_number}"
)
payload = {"title": title, "body": body}
try:
response = patch_request(url, payload)
except requests.exceptions.RequestException as e:
return error_response(f"Error: {e}")
return {"status": "success", "issue": response}
def _find_head_commit_sha(repo_path: str) -> str:
"""Checks the head commit hash of a Git repository."""
head_sha_command = ["git", "rev-parse", "HEAD"]
head_sha_process = subprocess.run(
head_sha_command,
cwd=repo_path,
capture_output=True,
text=True,
check=True,
)
current_commit_sha = head_sha_process.stdout.strip()
return current_commit_sha
def _get_pull(repo_path: str) -> str:
"""Pulls the latest changes from a Git repository."""
pull_process = subprocess.run(
["git", "pull"],
cwd=repo_path,
capture_output=True,
text=True,
check=True,
)
return pull_process.stdout.strip()
def _get_clone(repo_url: str, repo_path: str) -> str:
"""Clones a Git repository to a local folder."""
clone_process = subprocess.run(
["git", "clone", repo_url, repo_path],
capture_output=True,
text=True,
check=True,
)
return clone_process.stdout.strip()
def _git_grep(
repo_path: str,
pattern: str,
extensions: Optional[List[str]] = None,
ignored_dirs: Optional[List[str]] = None,
) -> subprocess.CompletedProcess[Any]:
"""Uses 'git grep' to find all matching lines in a Git repository."""
grep_command = [
"git",
"grep",
"-n",
"-I",
"-E",
"--ignore-case",
"-e",
pattern,
]
pathspecs = []
if extensions:
pathspecs.extend([f"*.{ext}" for ext in extensions])
if ignored_dirs:
pathspecs.extend([f":(exclude){d}" for d in ignored_dirs])
if pathspecs:
grep_command.append("--")
grep_command.extend(pathspecs)
grep_process = subprocess.run(
grep_command,
cwd=repo_path,
capture_output=True,
text=True,
check=False, # Don't raise error on non-zero exit code (1 means no match)
)
return grep_process
@@ -0,0 +1,72 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from typing import Dict
from typing import List
from adk_release_analyzer.settings import GITHUB_TOKEN
import requests
HEADERS = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
def error_response(error_message: str) -> Dict[str, Any]:
return {"status": "error", "error_message": error_message}
def get_request(
url: str,
headers: dict[str, Any] | None = None,
params: dict[str, Any] | None = None,
) -> Dict[str, Any]:
"""Executes a GET request."""
if headers is None:
headers = HEADERS
if params is None:
params = {}
response = requests.get(url, headers=headers, params=params, timeout=60)
response.raise_for_status()
return response.json()
def get_paginated_request(
url: str, headers: dict[str, Any] | None = None
) -> List[Dict[str, Any]]:
"""Executes GET requests and follows 'next' pagination links to fetch all results."""
if headers is None:
headers = HEADERS
results = []
while url:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
results.extend(response.json())
url = response.links.get("next", {}).get("url")
return results
def post_request(url: str, payload: Any) -> Dict[str, Any]:
response = requests.post(url, headers=HEADERS, json=payload, timeout=60)
response.raise_for_status()
return response.json()
def patch_request(url: str, payload: Any) -> Dict[str, Any]:
response = requests.patch(url, headers=HEADERS, json=payload, timeout=60)
response.raise_for_status()
return response.json()