chore: Enhance a2a event converter

a. fix function call long running id matching logic
b. fix error code conversion logic
c. add input required and auth required status conversion logic
d. add a2a Task/Message to ADK event converter
f. set task id and context id from input argument

PiperOrigin-RevId: 775860563
This commit is contained in:
Xiang (Sean) Zhou
2025-06-25 15:31:42 -07:00
committed by Copybara-Service
parent 832a633351
commit a71dbdf9e2
2 changed files with 917 additions and 70 deletions
+255 -33
View File
@@ -14,7 +14,8 @@
from __future__ import annotations
import datetime
from datetime import datetime
from datetime import timezone
import logging
from typing import Any
from typing import Dict
@@ -26,18 +27,24 @@ from a2a.server.events import Event as A2AEvent
from a2a.types import Artifact
from a2a.types import DataPart
from a2a.types import Message
from a2a.types import Part as A2APart
from a2a.types import Role
from a2a.types import Task
from a2a.types import TaskArtifactUpdateEvent
from a2a.types import TaskState
from a2a.types import TaskStatus
from a2a.types import TaskStatusUpdateEvent
from a2a.types import TextPart
from google.genai import types as genai_types
from ...agents.invocation_context import InvocationContext
from ...events.event import Event
from ...flows.llm_flows.functions import REQUEST_EUC_FUNCTION_CALL_NAME
from ...utils.feature_decorator import working_in_progress
from .part_converter import A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY
from .part_converter import A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
from .part_converter import A2A_DATA_PART_METADATA_TYPE_KEY
from .part_converter import convert_a2a_part_to_genai_part
from .part_converter import convert_genai_part_to_a2a_part
from .utils import _get_adk_metadata_key
@@ -143,6 +150,8 @@ def _convert_artifact_to_a2a_events(
invocation_context: InvocationContext,
filename: str,
version: int,
task_id: Optional[str] = None,
context_id: Optional[str] = None,
) -> TaskArtifactUpdateEvent:
"""Converts a new artifact version to an A2A TaskArtifactUpdateEvent.
@@ -151,6 +160,7 @@ def _convert_artifact_to_a2a_events(
invocation_context: The invocation context.
filename: The name of the artifact file.
version: The version number of the artifact.
task_id: Optional task ID to use for generated events. If not provided, new UUIDs will be generated.
Returns:
A TaskArtifactUpdateEvent representing the artifact update.
@@ -186,9 +196,9 @@ def _convert_artifact_to_a2a_events(
)
return TaskArtifactUpdateEvent(
taskId=str(uuid.uuid4()),
taskId=task_id,
append=False,
contextId=invocation_context.session.id,
contextId=context_id,
lastChunk=True,
artifact=Artifact(
artifactId=artifact_id,
@@ -210,7 +220,7 @@ def _convert_artifact_to_a2a_events(
raise RuntimeError(f"Artifact conversion failed: {e}") from e
def _process_long_running_tool(a2a_part, event: Event) -> None:
def _process_long_running_tool(a2a_part: A2APart, event: Event) -> None:
"""Processes long-running tool metadata for an A2A part.
Args:
@@ -220,18 +230,173 @@ def _process_long_running_tool(a2a_part, event: Event) -> None:
if (
isinstance(a2a_part.root, DataPart)
and event.long_running_tool_ids
and a2a_part.root.metadata
and a2a_part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and a2a_part.root.metadata.get("id") in event.long_running_tool_ids
and a2a_part.root.data.get("id") in event.long_running_tool_ids
):
a2a_part.root.metadata[_get_adk_metadata_key("is_long_running")] = True
a2a_part.root.metadata[
_get_adk_metadata_key(A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY)
] = True
@working_in_progress
def convert_event_to_a2a_status_message(
event: Event, invocation_context: InvocationContext
def convert_a2a_task_to_event(
a2a_task: Task,
author: Optional[str] = None,
invocation_context: Optional[InvocationContext] = None,
) -> Event:
"""Converts an A2A task to an ADK event.
Args:
a2a_task: The A2A task to convert. Must not be None.
author: The author of the event. Defaults to "a2a agent" if not provided.
invocation_context: The invocation context containing session information.
If provided, the branch will be set from the context.
Returns:
An ADK Event object representing the converted task.
Raises:
ValueError: If a2a_task is None.
RuntimeError: If conversion of the underlying message fails.
"""
if a2a_task is None:
raise ValueError("A2A task cannot be None")
try:
# Extract message from task status or history
message = None
if a2a_task.status and a2a_task.status.message:
message = a2a_task.status.message
elif a2a_task.history:
message = a2a_task.history[-1]
# Convert message if available
if message:
try:
return convert_a2a_message_to_event(message, author, invocation_context)
except Exception as e:
logger.error("Failed to convert A2A task message to event: %s", e)
raise RuntimeError(f"Failed to convert task message: {e}") from e
# Create minimal event if no message is available
return Event(
invocation_id=(
invocation_context.invocation_id
if invocation_context
else str(uuid.uuid4())
),
author=author or "a2a agent",
branch=invocation_context.branch if invocation_context else None,
)
except Exception as e:
logger.error("Failed to convert A2A task to event: %s", e)
raise
@working_in_progress
def convert_a2a_message_to_event(
a2a_message: Message,
author: Optional[str] = None,
invocation_context: Optional[InvocationContext] = None,
) -> Event:
"""Converts an A2A message to an ADK event.
Args:
a2a_message: The A2A message to convert. Must not be None.
author: The author of the event. Defaults to "a2a agent" if not provided.
invocation_context: The invocation context containing session information.
If provided, the branch will be set from the context.
Returns:
An ADK Event object with converted content and long-running tool metadata.
Raises:
ValueError: If a2a_message is None.
RuntimeError: If conversion of message parts fails.
"""
if a2a_message is None:
raise ValueError("A2A message cannot be None")
if not a2a_message.parts:
logger.warning(
"A2A message has no parts, creating event with empty content"
)
return Event(
invocation_id=(
invocation_context.invocation_id
if invocation_context
else str(uuid.uuid4())
),
author=author or "a2a agent",
branch=invocation_context.branch if invocation_context else None,
content=genai_types.Content(role="model", parts=[]),
)
try:
parts = []
long_running_tool_ids = set()
for a2a_part in a2a_message.parts:
try:
part = convert_a2a_part_to_genai_part(a2a_part)
if part is None:
logger.warning("Failed to convert A2A part, skipping: %s", a2a_part)
continue
# Check for long-running tools
if (
a2a_part.root.metadata
and a2a_part.root.metadata.get(
_get_adk_metadata_key(
A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY
)
)
is True
):
long_running_tool_ids.add(part.function_call.id)
parts.append(part)
except Exception as e:
logger.error("Failed to convert A2A part: %s, error: %s", a2a_part, e)
# Continue processing other parts instead of failing completely
continue
if not parts:
logger.warning(
"No parts could be converted from A2A message %s", a2a_message
)
return Event(
invocation_id=(
invocation_context.invocation_id
if invocation_context
else str(uuid.uuid4())
),
author=author or "a2a agent",
branch=invocation_context.branch if invocation_context else None,
long_running_tool_ids=long_running_tool_ids
if long_running_tool_ids
else None,
content=genai_types.Content(
role="model",
parts=parts,
),
)
except Exception as e:
logger.error("Failed to convert A2A message to event: %s", e)
raise RuntimeError(f"Failed to convert message: {e}") from e
@working_in_progress
def convert_event_to_a2a_message(
event: Event, invocation_context: InvocationContext, role: Role = Role.agent
) -> Optional[Message]:
"""Converts an ADK event to an A2A message.
@@ -262,9 +427,7 @@ def convert_event_to_a2a_status_message(
_process_long_running_tool(a2a_part, event)
if a2a_parts:
return Message(
messageId=str(uuid.uuid4()), role=Role.agent, parts=a2a_parts
)
return Message(messageId=str(uuid.uuid4()), role=role, parts=a2a_parts)
except Exception as e:
logger.error("Failed to convert event to status message: %s", e)
@@ -274,38 +437,57 @@ def convert_event_to_a2a_status_message(
def _create_error_status_event(
event: Event, invocation_context: InvocationContext
event: Event,
invocation_context: InvocationContext,
task_id: Optional[str] = None,
context_id: Optional[str] = None,
) -> TaskStatusUpdateEvent:
"""Creates a TaskStatusUpdateEvent for error scenarios.
Args:
event: The ADK event containing error information.
invocation_context: The invocation context.
task_id: Optional task ID to use for generated events.
context_id: Optional Context ID to use for generated events.
Returns:
A TaskStatusUpdateEvent with FAILED state.
"""
error_message = getattr(event, "error_message", None) or DEFAULT_ERROR_MESSAGE
# Get context metadata and add error code
event_metadata = _get_context_metadata(event, invocation_context)
if event.error_code:
event_metadata[_get_adk_metadata_key("error_code")] = str(event.error_code)
return TaskStatusUpdateEvent(
taskId=str(uuid.uuid4()),
contextId=invocation_context.session.id,
final=False,
metadata=_get_context_metadata(event, invocation_context),
taskId=task_id,
contextId=context_id,
metadata=event_metadata,
status=TaskStatus(
state=TaskState.failed,
message=Message(
messageId=str(uuid.uuid4()),
role=Role.agent,
parts=[TextPart(text=error_message)],
metadata={
_get_adk_metadata_key("error_code"): str(event.error_code)
}
if event.error_code
else {},
),
timestamp=datetime.datetime.now().isoformat(),
timestamp=datetime.now(timezone.utc).isoformat(),
),
final=False,
)
def _create_running_status_event(
message: Message, invocation_context: InvocationContext, event: Event
def _create_status_update_event(
message: Message,
invocation_context: InvocationContext,
event: Event,
task_id: Optional[str] = None,
context_id: Optional[str] = None,
) -> TaskStatusUpdateEvent:
"""Creates a TaskStatusUpdateEvent for running scenarios.
@@ -313,32 +495,70 @@ def _create_running_status_event(
message: The A2A message to include.
invocation_context: The invocation context.
event: The ADK event.
task_id: Optional task ID to use for generated events.
context_id: Optional Context ID to use for generated events.
Returns:
A TaskStatusUpdateEvent with RUNNING state.
"""
status = TaskStatus(
state=TaskState.working,
message=message,
timestamp=datetime.now(timezone.utc).isoformat(),
)
if any(
part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY)
)
is True
and part.root.data.get("name") == REQUEST_EUC_FUNCTION_CALL_NAME
for part in message.parts
if part.root.metadata
):
status.state = TaskState.auth_required
elif any(
part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_IS_LONG_RUNNING_KEY)
)
is True
for part in message.parts
if part.root.metadata
):
status.state = TaskState.input_required
return TaskStatusUpdateEvent(
taskId=str(uuid.uuid4()),
contextId=invocation_context.session.id,
final=False,
status=TaskStatus(
state=TaskState.working,
message=message,
timestamp=datetime.datetime.now().isoformat(),
),
taskId=task_id,
contextId=context_id,
status=status,
metadata=_get_context_metadata(event, invocation_context),
final=False,
)
@working_in_progress
def convert_event_to_a2a_events(
event: Event, invocation_context: InvocationContext
event: Event,
invocation_context: InvocationContext,
task_id: Optional[str] = None,
context_id: Optional[str] = None,
) -> List[A2AEvent]:
"""Converts a GenAI event to a list of A2A events.
Args:
event: The ADK event to convert.
invocation_context: The invocation context.
task_id: Optional task ID to use for generated events.
context_id: Optional Context ID to use for generated events.
Returns:
A list of A2A events representing the converted ADK event.
@@ -358,20 +578,22 @@ def convert_event_to_a2a_events(
if event.actions.artifact_delta:
for filename, version in event.actions.artifact_delta.items():
artifact_event = _convert_artifact_to_a2a_events(
event, invocation_context, filename, version
event, invocation_context, filename, version, task_id, context_id
)
a2a_events.append(artifact_event)
# Handle error scenarios
if event.error_code:
error_event = _create_error_status_event(event, invocation_context)
error_event = _create_error_status_event(
event, invocation_context, task_id, context_id
)
a2a_events.append(error_event)
# Handle regular message content
message = convert_event_to_a2a_status_message(event, invocation_context)
message = convert_event_to_a2a_message(event, invocation_context)
if message:
running_event = _create_running_status_event(
message, invocation_context, event
running_event = _create_status_update_event(
message, invocation_context, event, task_id, context_id
)
a2a_events.append(running_event)
File diff suppressed because it is too large Load Diff