diff --git a/contributing/samples/cache_analysis/README.md b/contributing/samples/cache_analysis/README.md new file mode 100644 index 00000000..350baccf --- /dev/null +++ b/contributing/samples/cache_analysis/README.md @@ -0,0 +1,114 @@ +# Cache Analysis Research Assistant + +This sample demonstrates ADK context caching features with a comprehensive research assistant agent designed to test both Gemini 2.0 Flash and 2.5 Flash context caching capabilities. The sample showcases the difference between explicit ADK caching and Google's built-in implicit caching. + +## Key Features + +- **App-Level Cache Configuration**: Context cache settings applied at the App level +- **Large Context Instructions**: Over 4200 tokens in system instructions to trigger context caching thresholds +- **Comprehensive Tool Suite**: 7 specialized research and analysis tools +- **Multi-Model Support**: Compatible with any Gemini model, automatically adapts experiment type +- **Performance Metrics**: Detailed token usage tracking including `cached_content_token_count` + +## Cache Configuration + +```python +ContextCacheConfig( + min_tokens=4096, + ttl_seconds=600, # 10 mins for research sessions + cache_intervals=3, # Maximum invocations before cache invalidation +``` + +## Usage + +### Run Cache Experiments + +The `run_cache_experiments.py` script compares caching performance between models: + +```bash +# Test any Gemini model - script automatically determines experiment type +python run_cache_experiments.py --output results.json + +# Examples: +python run_cache_experiments.py gemini-2.0-flash-001 --output gemini_2_0_results.json +python run_cache_experiments.py gemini-2.5-flash --output gemini_2_5_results.json +python run_cache_experiments.py gemini-1.5-flash --output gemini_1_5_results.json + +# Run multiple iterations for averaged results +python run_cache_experiments.py --repeat 3 --output averaged_results.json +``` + +### Direct Agent Usage + +```bash +# Run the agent directly +adk run contributing/samples/cache_analysis/agent.py + +# Web interface for debugging +adk web contributing/samples/cache_analysis +``` + +## Experiment Types + +The script automatically determines the experiment type based on the model name: + +### Models with "2.5" (e.g., gemini-2.5-flash) +- **Explicit Caching**: ADK explicit caching + Google's implicit caching +- **Implicit Only**: Google's built-in implicit caching alone +- **Measures**: Added benefit of explicit caching over Google's built-in implicit caching + +### Other Models (e.g., gemini-2.0-flash-001, gemini-1.5-flash) +- **Cached**: ADK explicit context caching enabled +- **Uncached**: No caching (baseline comparison) +- **Measures**: Raw performance improvement from explicit caching vs no caching + +## Tools Included + +1. **analyze_data_patterns** - Statistical analysis and pattern recognition in datasets +2. **research_literature** - Academic and professional literature research with citations +3. **generate_test_scenarios** - Comprehensive test case generation and validation strategies +4. **benchmark_performance** - System performance measurement and bottleneck analysis +5. **optimize_system_performance** - Performance optimization recommendations and strategies +6. **analyze_security_vulnerabilities** - Security risk assessment and vulnerability analysis +7. **design_scalability_architecture** - Scalable system architecture design and planning + +## Expected Results + +### Performance vs Cost Trade-offs + +**Note**: This sample uses a tool-heavy agent that may show different performance characteristics than simple text-based agents. + +### Performance Improvements +- **Simple Text Agents**: Typically see 30-70% latency reduction with caching +- **Tool-Heavy Agents**: May experience higher latency due to cache setup overhead, but still provide cost benefits +- **Gemini 2.5 Flash**: Compares explicit ADK caching against Google's built-in implicit caching + +### Cost Savings +- **Input Token Cost**: 75% reduction for cached content (25% of normal cost) +- **Typical Savings**: 30-60% on input costs for multi-turn conversations +- **Tool-Heavy Workloads**: Cost savings often outweigh latency trade-offs + +### Token Metrics +- **Cached Content Token Count**: Non-zero values indicating successful cache hits +- **Cache Hit Ratio**: Proportion of tokens served from cache vs fresh computation + +## Troubleshooting + +### Zero Cached Tokens +If `cached_content_token_count` is always 0: +- Verify model names match exactly (e.g., `gemini-2.0-flash-001`) +- Check that cache configuration `min_tokens` threshold is met +- Ensure proper App-based configuration is used + +### Session Errors +If seeing "Session not found" errors: +- Verify `runner.app_name` is used for session creation +- Check App vs Agent object usage in InMemoryRunner initialization + +## Technical Implementation + +This sample demonstrates: +- **Modern App Architecture**: App-level cache configuration following ADK best practices +- **Integration Testing**: Comprehensive cache functionality validation +- **Performance Analysis**: Detailed metrics collection and comparison methodology +- **Error Handling**: Robust session management and cache invalidation handling diff --git a/contributing/samples/cache_analysis/__init__.py b/contributing/samples/cache_analysis/__init__.py new file mode 100644 index 00000000..3d21a562 --- /dev/null +++ b/contributing/samples/cache_analysis/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import agent + +__all__ = ['agent'] diff --git a/contributing/samples/cache_analysis/agent.py b/contributing/samples/cache_analysis/agent.py new file mode 100644 index 00000000..b1a25bf8 --- /dev/null +++ b/contributing/samples/cache_analysis/agent.py @@ -0,0 +1,854 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Cache Analysis Research Assistant Agent. + +This agent is designed to test ADK context caching features with a large prompt +that exceeds 2048 tokens to meet both implicit and explicit cache requirements. +""" + +import random +import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from dotenv import load_dotenv +from google.adk import Agent +from google.adk.agents.context_cache_config import ContextCacheConfig +from google.adk.apps.app import App + +# Load environment variables from .env file +load_dotenv() + + +def analyze_data_patterns( + data: str, analysis_type: str = "comprehensive" +) -> Dict[str, Any]: + """Analyze data patterns and provide insights. + + This tool performs comprehensive data analysis including statistical analysis, + trend identification, anomaly detection, correlation analysis, and predictive + modeling. It can handle various data formats including CSV, JSON, XML, and + plain text data structures. + + Args: + data: The input data to analyze. Can be structured (JSON, CSV) or + unstructured text data. For structured data, include column headers + and ensure proper formatting. For time series data, include + timestamps in ISO format. + analysis_type: Type of analysis to perform. Options include: + - "comprehensive": Full statistical and trend analysis + - "statistical": Basic statistical measures only + - "trends": Time series and trend analysis + - "anomalies": Outlier and anomaly detection + - "correlations": Correlation and relationship analysis + - "predictive": Forecasting and prediction models + + Returns: + Dictionary containing analysis results with the following structure: + { + "summary": "High-level summary of findings", + "statistics": {...}, # Statistical measures + "trends": {...}, # Trend analysis results + "anomalies": [...], # List of detected anomalies + "correlations": {...}, # Correlation matrix and relationships + "predictions": {...}, # Forecasting results if applicable + "recommendations": [...] # Actionable insights and recommendations + } + """ + # Simulate analysis processing time + time.sleep(0.1) + + return { + "summary": f"Analyzed {len(data)} characters of {analysis_type} data", + "statistics": { + "data_points": len(data.split()), + "analysis_type": analysis_type, + "processing_time": "0.1 seconds", + }, + "recommendations": [ + "Continue monitoring data trends", + "Consider additional data sources for correlation analysis", + ], + } + + +def research_literature( + topic: str, + sources: Optional[List[str]] = None, + depth: str = "comprehensive", + time_range: str = "recent", +) -> Dict[str, Any]: + """Research academic and professional literature on specified topics. + + This tool performs comprehensive literature research across multiple academic + databases, professional journals, conference proceedings, and industry reports. + It can analyze research trends, identify key authors and institutions, extract + methodological approaches, and synthesize findings across multiple sources. + + The tool supports various research methodologies including systematic reviews, + meta-analyses, bibliometric analysis, and citation network analysis. It can + identify research gaps, emerging trends, and future research directions in + the specified field of study. + + Args: + topic: The research topic or query. Can be specific (e.g., "context caching + in large language models") or broad (e.g., "machine learning optimization"). + Use specific keywords and phrases for better results. Boolean operators + (AND, OR, NOT) are supported for complex queries. + sources: List of preferred sources to search. Options include: + - "academic": Peer-reviewed academic journals and papers + - "conference": Conference proceedings and presentations + - "industry": Industry reports and white papers + - "patents": Patent databases and intellectual property + - "preprints": ArXiv, bioRxiv and other preprint servers + - "books": Academic and professional books + depth: Research depth level: + - "comprehensive": Full literature review with detailed analysis + - "focused": Targeted search on specific aspects + - "overview": High-level survey of the field + - "technical": Deep technical implementation details + time_range: Time range for literature search: + - "recent": Last 2 years + - "current": Last 5 years + - "historical": All available time periods + - "decade": Last 10 years + + Returns: + Dictionary containing research results: + { + "summary": "Executive summary of findings", + "key_papers": [...], # Most relevant papers found + "authors": [...], # Key researchers in the field + "institutions": [...], # Leading research institutions + "trends": {...}, # Research trends and evolution + "methodologies": [...], # Common research approaches + "gaps": [...], # Identified research gaps + "citations": {...}, # Citation network analysis + "recommendations": [...] # Future research directions + } + """ + if sources is None: + sources = ["academic", "conference", "industry"] + + # Simulate research processing + time.sleep(0.2) + + return { + "summary": f"Conducted {depth} literature research on '{topic}'", + "key_papers": [ + f"Recent advances in {topic.lower()}: A systematic review", + f"Methodological approaches to {topic.lower()} optimization", + f"Future directions in {topic.lower()} research", + ], + "trends": { + "emerging_topics": [f"{topic} optimization", f"{topic} scalability"], + "methodology_trends": [ + "experimental validation", + "theoretical analysis", + ], + }, + "recommendations": [ + f"Focus on practical applications of {topic}", + "Consider interdisciplinary approaches", + "Investigate scalability challenges", + ], + } + + +def generate_test_scenarios( + system_type: str, + complexity: str = "medium", + coverage: Optional[List[str]] = None, + constraints: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Generate comprehensive test scenarios for system validation. + + This tool creates detailed test scenarios, test cases, and validation protocols + for various types of systems including software applications, AI models, + distributed systems, and hardware components. It supports multiple testing + methodologies including unit testing, integration testing, performance testing, + security testing, and user acceptance testing. + + The tool can generate both positive and negative test cases, edge cases, + boundary conditions, stress tests, and failure scenarios. It incorporates + industry best practices and testing frameworks to ensure comprehensive + coverage and reliable validation results. + + Args: + system_type: Type of system to test. Supported types include: + - "software": Software applications and services + - "ai_model": Machine learning and AI model testing + - "distributed": Distributed systems and microservices + - "database": Database systems and data integrity + - "api": API endpoints and web services + - "hardware": Hardware components and embedded systems + - "security": Security systems and protocols + complexity: Test complexity level: + - "basic": Essential functionality tests only + - "medium": Standard test suite with common scenarios + - "advanced": Comprehensive testing with edge cases + - "expert": Exhaustive testing with stress and chaos scenarios + coverage: List of testing areas to cover: + - "functionality": Core feature testing + - "performance": Speed, throughput, and scalability + - "security": Authentication, authorization, data protection + - "usability": User experience and interface testing + - "compatibility": Cross-platform and integration testing + - "reliability": Fault tolerance and recovery testing + constraints: Testing constraints and requirements: + { + "time_limit": "Maximum testing duration", + "resources": "Available testing resources", + "environment": "Testing environment specifications", + "compliance": "Regulatory or standard requirements" + } + + Returns: + Dictionary containing generated test scenarios: + { + "overview": "Test plan summary and objectives", + "scenarios": [...], # Detailed test scenarios + "test_cases": [...], # Individual test cases + "edge_cases": [...], # Boundary and edge conditions + "performance_tests": [...], # Performance validation tests + "security_tests": [...], # Security and vulnerability tests + "automation": {...}, # Test automation recommendations + "metrics": {...}, # Success criteria and metrics + "schedule": {...} # Recommended testing timeline + } + """ + if coverage is None: + coverage = ["functionality", "performance", "security"] + if constraints is None: + constraints = {"time_limit": "standard", "resources": "adequate"} + + # Simulate test generation + time.sleep(0.15) + + num_scenarios = {"basic": 5, "medium": 10, "advanced": 20, "expert": 35}.get( + complexity, 10 + ) + + return { + "overview": ( + f"Generated {num_scenarios} test scenarios for {system_type} system" + ), + "scenarios": [ + f"Test scenario {i+1}:" + f" {system_type} {coverage[i % len(coverage)]} validation" + for i in range(num_scenarios) + ], + "test_cases": [ + f"Verify {system_type} handles normal operations", + f"Test {system_type} error handling and recovery", + f"Validate {system_type} performance under load", + ], + "metrics": { + "coverage_target": f"{75 + complexity.index(complexity) * 5}%", + "success_criteria": "All critical tests pass", + "performance_benchmark": f"{system_type} specific benchmarks", + }, + } + + +def optimize_system_performance( + system_type: str, + current_metrics: Dict[str, Any], + target_improvements: Dict[str, Any], + constraints: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Analyze system performance and provide detailed optimization recommendations. + + This tool performs comprehensive system performance analysis including bottleneck + identification, resource utilization assessment, scalability planning, and provides + specific optimization strategies tailored to the system type and constraints. + + Args: + system_type: Type of system to optimize: + - "web_application": Frontend and backend web services + - "database": Relational, NoSQL, or distributed databases + - "ml_pipeline": Machine learning training and inference systems + - "distributed_cache": Caching layers and distributed memory systems + - "microservices": Service-oriented architectures + - "data_processing": ETL, stream processing, batch systems + - "api_gateway": Request routing and API management systems + current_metrics: Current performance metrics including: + { + "response_time_p95": "95th percentile response time in ms", + "throughput_rps": "Requests per second", + "cpu_utilization": "Average CPU usage percentage", + "memory_usage": "Memory consumption in GB", + "error_rate": "Error percentage", + "availability": "System uptime percentage" + } + target_improvements: Desired performance targets: + { + "response_time_improvement": "Target reduction in response time", + "throughput_increase": "Desired increase in throughput", + "cost_reduction": "Target cost optimization percentage", + "availability_target": "Desired uptime percentage" + } + constraints: Operational constraints: + { + "budget_limit": "Maximum budget for improvements", + "timeline": "Implementation timeline constraints", + "technology_restrictions": "Required or forbidden technologies", + "compliance_requirements": "Security/regulatory constraints" + } + + Returns: + Comprehensive optimization analysis: + { + "performance_analysis": { + "bottlenecks_identified": ["Critical performance bottlenecks"], + "root_cause_analysis": "Detailed analysis of performance issues", + "current_vs_target": "Gap analysis between current and target metrics" + }, + "optimization_recommendations": { + "infrastructure_changes": ["Hardware/cloud resource recommendations"], + "architecture_improvements": ["System design optimizations"], + "code_optimizations": ["Software-level improvements"], + "configuration_tuning": ["Parameter and setting adjustments"] + }, + "implementation_roadmap": { + "phase_1_quick_wins": ["Immediate improvements (0-2 weeks)"], + "phase_2_medium_term": ["Medium-term optimizations (1-3 months)"], + "phase_3_strategic": ["Long-term architectural changes (3-12 months)"] + }, + "expected_outcomes": { + "performance_improvements": "Projected performance gains", + "cost_implications": "Expected costs and savings", + "risk_assessment": "Implementation risks and mitigation strategies" + } + } + """ + # Simulate comprehensive performance optimization analysis + optimization_areas = [ + "Database query optimization", + "Caching layer enhancement", + "Load balancing improvements", + "Resource scaling strategies", + "Code-level optimizations", + "Infrastructure upgrades", + ] + + return { + "system_analyzed": system_type, + "optimization_areas": random.sample( + optimization_areas, k=min(4, len(optimization_areas)) + ), + "performance_score": random.randint(65, 95), + "implementation_complexity": random.choice(["Low", "Medium", "High"]), + "estimated_improvement": f"{random.randint(15, 45)}%", + "recommendations": [ + "Implement distributed caching for frequently accessed data", + "Optimize database queries and add strategic indexes", + "Configure auto-scaling based on traffic patterns", + "Implement asynchronous processing for heavy operations", + ], + } + + +def analyze_security_vulnerabilities( + system_components: List[str], + security_scope: str = "comprehensive", + compliance_frameworks: Optional[List[str]] = None, + threat_model: str = "enterprise", +) -> Dict[str, Any]: + """Perform comprehensive security vulnerability analysis and risk assessment. + + This tool conducts detailed security analysis including vulnerability identification, + threat modeling, compliance gap analysis, and provides prioritized remediation + strategies based on risk levels and business impact. + + Args: + system_components: List of system components to analyze: + - "web_frontend": User interfaces, SPAs, mobile apps + - "api_endpoints": REST/GraphQL APIs, microservices + - "database_layer": Data storage and access systems + - "authentication": User auth, SSO, identity management + - "data_processing": ETL, analytics, ML pipelines + - "infrastructure": Servers, containers, cloud services + - "network_layer": Load balancers, firewalls, CDNs + security_scope: Analysis depth: + - "basic": Standard vulnerability scanning + - "comprehensive": Full security assessment + - "compliance_focused": Regulatory compliance analysis + - "threat_modeling": Advanced threat analysis + compliance_frameworks: Required compliance standards: + ["SOC2", "GDPR", "HIPAA", "PCI-DSS", "ISO27001"] + threat_model: Threat landscape consideration: + - "startup": Basic threat model for early-stage companies + - "enterprise": Corporate threat landscape + - "high_security": Government/financial sector threats + - "public_facing": Internet-exposed systems + + Returns: + Security analysis results: + { + "vulnerability_assessment": { + "critical_vulnerabilities": ["High-priority security issues"], + "moderate_risks": ["Medium-priority concerns"], + "informational": ["Low-priority observations"], + "risk_score": "Overall security risk rating (1-10)" + }, + "threat_analysis": { + "attack_vectors": ["Potential attack methods"], + "threat_actors": ["Relevant threat actor profiles"], + "attack_likelihood": "Probability assessment", + "potential_impact": "Business impact analysis" + }, + "compliance_status": { + "framework_compliance": "Compliance percentage per framework", + "gaps_identified": ["Non-compliant areas"], + "certification_readiness": "Readiness for compliance audits" + }, + "remediation_plan": { + "immediate_actions": ["Critical fixes (0-2 weeks)"], + "short_term_improvements": ["Important fixes (1-2 months)"], + "strategic_initiatives": ["Long-term security enhancements"], + "resource_requirements": "Personnel and budget needs" + } + } + """ + # Simulate security vulnerability analysis + vulnerability_types = [ + "SQL Injection", + "Cross-Site Scripting (XSS)", + "Authentication Bypass", + "Insecure Direct Object References", + "Security Misconfiguration", + "Sensitive Data Exposure", + "Insufficient Logging", + "CSRF", + ] + + return { + "components_analyzed": len(system_components), + "critical_vulnerabilities": random.randint(0, 3), + "moderate_risks": random.randint(2, 8), + "overall_security_score": random.randint(6, 9), + "compliance_percentage": random.randint(75, 95), + "top_recommendations": [ + "Implement input validation and parameterized queries", + "Enable comprehensive security logging and monitoring", + "Review and update authentication and authorization controls", + "Conduct regular security training for development team", + ], + } + + +def design_scalability_architecture( + current_architecture: str, + expected_growth: Dict[str, Any], + scalability_requirements: Dict[str, Any], + technology_preferences: Optional[List[str]] = None, +) -> Dict[str, Any]: + """Design comprehensive scalability architecture for anticipated growth. + + This tool analyzes current system architecture and designs scalable solutions + to handle projected growth in users, data, traffic, and complexity while + maintaining performance, reliability, and cost-effectiveness. + + Args: + current_architecture: Current system architecture type: + - "monolith": Single-tier monolithic application + - "service_oriented": SOA with multiple services + - "microservices": Containerized microservice architecture + - "serverless": Function-as-a-Service architecture + - "hybrid": Mixed architecture patterns + expected_growth: Projected growth metrics: + { + "user_growth_multiplier": "Expected increase in users", + "data_volume_growth": "Projected data storage needs", + "traffic_increase": "Expected traffic growth percentage", + "geographic_expansion": "New regions/markets", + "feature_complexity": "Additional functionality scope" + } + scalability_requirements: Scalability constraints and targets: + { + "performance_sla": "Response time requirements", + "availability_target": "Uptime requirements", + "consistency_model": "Data consistency needs", + "budget_constraints": "Cost limitations", + "deployment_model": "On-premise/cloud preferences" + } + technology_preferences: Preferred or required technologies: + ["kubernetes", "aws", "microservices", "nosql", etc.] + + Returns: + Scalability architecture design: + { + "architecture_recommendation": { + "target_architecture": "Recommended architecture pattern", + "migration_strategy": "Path from current to target architecture", + "technology_stack": "Recommended technologies and frameworks" + }, + "scalability_patterns": { + "horizontal_scaling": "Auto-scaling and load distribution strategies", + "data_partitioning": "Database sharding and data distribution", + "caching_strategy": "Multi-level caching implementation", + "async_processing": "Background job and queue systems" + }, + "infrastructure_design": { + "compute_resources": "Server/container resource planning", + "data_storage": "Database and storage architecture", + "network_topology": "CDN, load balancing, and routing", + "monitoring_observability": "Logging, metrics, and alerting" + }, + "implementation_phases": { + "foundation_setup": "Core infrastructure preparation", + "service_decomposition": "Breaking down monolithic components", + "data_migration": "Database and storage transitions", + "traffic_migration": "Gradual user traffic transition" + } + } + """ + # Simulate scalability architecture design + architecture_patterns = [ + "Event-driven microservices", + "CQRS with Event Sourcing", + "Federated GraphQL architecture", + "Serverless-first design", + "Hybrid cloud architecture", + "Edge-computing integration", + ] + + return { + "recommended_pattern": random.choice(architecture_patterns), + "scalability_factor": f"{random.randint(5, 50)}x current capacity", + "implementation_timeline": f"{random.randint(6, 18)} months", + "estimated_cost_increase": f"{random.randint(20, 80)}%", + "key_technologies": random.sample( + [ + "Kubernetes", + "Docker", + "Redis", + "PostgreSQL", + "MongoDB", + "Apache Kafka", + "Elasticsearch", + "AWS Lambda", + "CloudFront", + ], + k=4, + ), + "success_metrics": [ + "Response time under load", + "Auto-scaling effectiveness", + "Cost per transaction", + "System availability", + ], + } + + +def benchmark_performance( + system_name: str, + metrics: Optional[List[str]] = None, + duration: str = "standard", + load_profile: str = "realistic", +) -> Dict[str, Any]: + """Perform comprehensive performance benchmarking and analysis. + + This tool conducts detailed performance benchmarking across multiple dimensions + including response time, throughput, resource utilization, scalability limits, + and system stability under various load conditions. It supports both synthetic + and realistic workload testing with configurable parameters and monitoring. + + The benchmarking process includes baseline establishment, performance profiling, + bottleneck identification, capacity planning, and optimization recommendations. + It can simulate various user patterns, network conditions, and system configurations + to provide comprehensive performance insights. + + Args: + system_name: Name or identifier of the system to benchmark. Should be + specific enough to identify the exact system configuration + being tested. + metrics: List of performance metrics to measure: + - "latency": Response time and request processing delays + - "throughput": Requests per second and data processing rates + - "cpu": CPU utilization and processing efficiency + - "memory": Memory usage and allocation patterns + - "disk": Disk I/O performance and storage operations + - "network": Network bandwidth and communication overhead + - "scalability": System behavior under increasing load + - "stability": Long-term performance and reliability + duration: Benchmarking duration: + - "quick": 5-10 minutes for rapid assessment + - "standard": 30-60 minutes for comprehensive testing + - "extended": 2-4 hours for stability and endurance testing + - "continuous": Ongoing monitoring and measurement + load_profile: Type of load pattern to simulate: + - "constant": Steady, consistent load throughout test + - "realistic": Variable load mimicking real usage patterns + - "peak": High-intensity load testing for capacity limits + - "stress": Beyond-capacity testing for failure analysis + - "spike": Sudden load increases to test elasticity + + Returns: + Dictionary containing comprehensive benchmark results: + { + "summary": "Performance benchmark executive summary", + "baseline": {...}, # Baseline performance measurements + "results": {...}, # Detailed performance metrics + "bottlenecks": [...], # Identified performance bottlenecks + "scalability": {...}, # Scalability analysis results + "recommendations": [...], # Performance optimization suggestions + "capacity": {...}, # Capacity planning insights + "monitoring": {...} # Ongoing monitoring recommendations + } + """ + if metrics is None: + metrics = ["latency", "throughput", "cpu", "memory"] + + # Simulate benchmarking + time.sleep(0.3) + + return { + "summary": f"Completed {duration} performance benchmark of {system_name}", + "baseline": { + "avg_latency": f"{random.uniform(50, 200):.2f}ms", + "throughput": f"{random.randint(100, 1000)} requests/sec", + "cpu_usage": f"{random.uniform(20, 80):.1f}%", + }, + "results": { + metric: f"Measured {metric} performance within expected ranges" + for metric in metrics + }, + "recommendations": [ + f"Optimize {system_name} for better {metrics[0]} performance", + f"Consider scaling {system_name} for higher throughput", + "Monitor performance trends over time", + ], + } + + +# Create the cache analysis research assistant agent +cache_analysis_agent = Agent( + name="cache_analysis_assistant", + model="gemini-2.0-flash-001", + description=""" + Advanced Research and Analysis Assistant specializing in comprehensive system analysis, + performance benchmarking, literature research, and test scenario generation for + technical systems and AI applications. + """, + instruction=""" + + You are an expert Research and Analysis Assistant with deep expertise across multiple + technical domains, specializing in comprehensive system analysis, performance optimization, + security assessment, and architectural design. Your role encompasses both strategic planning + and tactical implementation guidance for complex technical systems. + + **Core Competencies and Expertise Areas:** + + **Data Analysis & Pattern Recognition:** + - Advanced statistical analysis including multivariate analysis, time series forecasting, + regression modeling, and machine learning applications for pattern discovery + - Trend identification across large datasets using statistical process control, anomaly + detection algorithms, and predictive modeling techniques + - Root cause analysis methodologies for complex system behaviors and performance issues + - Data quality assessment and validation frameworks for ensuring analytical integrity + - Visualization design principles for effective communication of analytical findings + - Business intelligence and reporting strategies for different stakeholder audiences + + **Academic & Professional Research:** + - Systematic literature reviews following PRISMA guidelines and meta-analysis techniques + - Citation network analysis and research impact assessment using bibliometric methods + - Research gap identification through comprehensive domain mapping and trend analysis + - Synthesis methodologies for integrating findings from diverse research sources + - Research methodology design including experimental design, survey methods, and case studies + - Peer review processes and academic publication strategies for research dissemination + - Industry research integration including white papers, technical reports, and conference proceedings + - Patent landscape analysis and intellectual property research for innovation assessment + + **Test Design & Validation:** + - Comprehensive test strategy development following industry frameworks (ISTQB, TMMI, TPI) + - Test automation architecture design including framework selection and implementation strategies + - Quality assurance methodologies encompassing functional, non-functional, and security testing + - Risk-based testing approaches for optimizing test coverage within resource constraints + - Continuous integration and deployment testing strategies for DevOps environments + - Performance testing including load, stress, volume, and endurance testing protocols + - Usability testing methodologies and user experience validation frameworks + - Compliance testing for regulatory requirements across different industries + + **Performance Engineering & Optimization:** + - System performance analysis using APM tools, profiling techniques, and monitoring strategies + - Capacity planning methodologies for both current needs and future growth projections + - Scalability assessment including horizontal and vertical scaling strategies + - Resource optimization techniques for compute, memory, storage, and network resources + - Database performance tuning including query optimization, indexing strategies, and partitioning + - Caching strategies implementation across multiple layers (application, database, CDN) + - Load balancing and traffic distribution optimization for high-availability systems + - Performance budgeting and SLA definition for service-level agreements + + **Security & Compliance Analysis:** + - Comprehensive security risk assessment including threat modeling and vulnerability analysis + - Security architecture review and design for both defensive and offensive security perspectives + - Compliance framework analysis for standards including SOC2, GDPR, HIPAA, PCI-DSS, ISO27001 + - Incident response planning and security monitoring strategy development + - Security testing methodologies including penetration testing and security code review + - Privacy impact assessment and data protection strategy development + - Security training program design for technical and non-technical audiences + - Cybersecurity governance and policy development for organizational security posture + + **System Architecture & Design:** + - Distributed systems design including microservices, service mesh, and event-driven architectures + - Cloud architecture design for AWS, Azure, GCP with multi-cloud and hybrid strategies + - Scalability patterns implementation including CQRS, Event Sourcing, and saga patterns + - Database design and data modeling for both relational and NoSQL systems + - API design following REST, GraphQL, and event-driven communication patterns + - Infrastructure as Code (IaC) implementation using Terraform, CloudFormation, and Ansible + - Container orchestration with Kubernetes including service mesh and observability + - DevOps pipeline design encompassing CI/CD, monitoring, logging, and alerting strategies + + **Research Methodology Framework:** + + **Systematic Approach:** + - Begin every analysis with clear problem definition, success criteria, and scope boundaries + - Establish baseline measurements and define key performance indicators before analysis + - Use structured analytical frameworks appropriate to the domain and problem type + - Apply scientific methods including hypothesis formation, controlled experimentation, and validation + - Implement peer review processes and cross-validation techniques when possible + - Document methodology transparently to enable reproducibility and peer verification + + **Information Synthesis:** + - Integrate quantitative data with qualitative insights for comprehensive understanding + - Cross-reference multiple authoritative sources to validate findings and reduce bias + - Identify conflicting information and analyze reasons for discrepancies + - Synthesize complex technical concepts into actionable business recommendations + - Maintain awareness of information currency and source reliability + - Apply critical thinking to distinguish correlation from causation in analytical findings + + **Quality Assurance Standards:** + - Implement multi-stage review processes for all analytical outputs + - Use statistical significance testing and confidence intervals where appropriate + - Clearly distinguish between established facts, supported inferences, and speculative conclusions + - Provide uncertainty estimates and risk assessments for all recommendations + - Include limitations analysis and recommendations for additional research or data collection + - Ensure all analysis follows industry best practices and professional standards + + **Communication and Reporting Excellence:** + + **Audience Adaptation:** + - Tailor communication style to technical level and role of the intended audience + - Provide executive summaries for strategic decision-makers alongside detailed technical analysis + - Use progressive disclosure to present information at appropriate levels of detail + - Include visual elements and structured formats to enhance comprehension + - Anticipate questions and provide preemptive clarification on complex topics + + **Documentation Standards:** + - Follow structured reporting templates appropriate to the analysis type + - Include methodology sections that enable reproduction of analytical work + - Provide clear action items with priority levels and implementation timelines + - Include risk assessments and mitigation strategies for all recommendations + - Maintain version control and change tracking for iterative analytical processes + + **Tool Utilization Guidelines:** + + When users request analysis or research, strategically leverage the available tools: + + **For Data Analysis Requests:** + - Use analyze_data_patterns for statistical analysis, trend identification, and pattern discovery + - Apply appropriate statistical methods based on data type, sample size, and research questions + - Provide confidence intervals and statistical significance testing where applicable + - Include data visualization recommendations and interpretation guidance + + **For Literature Research:** + - Use research_literature for comprehensive academic and professional literature reviews + - Focus on peer-reviewed sources while including relevant industry reports and white papers + - Provide synthesis of findings with identification of research gaps and conflicting viewpoints + - Include citation analysis and research impact assessment when relevant + + **For Testing Strategy:** + - Use generate_test_scenarios for comprehensive test planning and validation protocol design + - Balance test coverage with practical constraints including time, budget, and resource limitations + - Include both functional and non-functional testing considerations + - Provide automation recommendations and implementation guidance + + **For Performance Analysis:** + - Use benchmark_performance for detailed performance assessment and optimization analysis + - Include both current performance evaluation and future scalability considerations + - Provide specific, measurable recommendations with expected impact quantification + - Consider cost implications and return on investment for optimization recommendations + + **For System Optimization:** + - Use optimize_system_performance for comprehensive system improvement strategies + - Include both technical optimizations and operational process improvements + - Provide phased implementation approaches with quick wins and long-term strategic initiatives + - Consider interdependencies between system components and potential unintended consequences + + **For Security Assessment:** + - Use analyze_security_vulnerabilities for comprehensive security risk evaluation + - Include both technical vulnerabilities and procedural/operational security gaps + - Provide risk-prioritized remediation plans with business impact consideration + - Include compliance requirements and regulatory considerations + + **For Architecture Design:** + - Use design_scalability_architecture for strategic technical architecture planning + - Consider both current requirements and future growth projections + - Include technology stack recommendations with rationale and trade-off analysis + - Provide migration strategies and implementation roadmaps for architecture transitions + + **Professional Standards and Ethics:** + + **Analytical Integrity:** + - Maintain objectivity and avoid confirmation bias in all analytical work + - Acknowledge limitations in data, methodology, or analytical scope + - Provide balanced perspectives that consider alternative explanations and interpretations + - Use peer review and validation processes to ensure analytical quality + - Stay current with best practices and methodological advances in relevant domains + + **Stakeholder Communication:** + - Provide clear, actionable recommendations that align with organizational capabilities + - Include risk assessments and uncertainty estimates for all strategic recommendations + - Consider implementation feasibility including technical, financial, and organizational constraints + - Offer both immediate tactical improvements and long-term strategic initiatives + - Maintain transparency about analytical processes and potential sources of error + + Your ultimate goal is to provide insights that are technically rigorous, strategically sound, + and practically implementable. Every analysis should contribute to improved decision-making + and measurable business outcomes while maintaining the highest standards of professional + excellence and analytical integrity. + """, + tools=[ + analyze_data_patterns, + research_literature, + generate_test_scenarios, + benchmark_performance, + optimize_system_performance, + analyze_security_vulnerabilities, + design_scalability_architecture, + ], +) + +# Create the app with context caching configuration +# Note: Context cache config is set at the App level +cache_analysis_app = App( + name="cache_analysis", + root_agent=cache_analysis_agent, + context_cache_config=ContextCacheConfig( + min_tokens=4096, + ttl_seconds=600, # 10 mins for research sessions + cache_intervals=3, # Maximum invocations before cache refresh + ), +) + +# Export as app since it's an App, not an Agent +app = cache_analysis_app + +# Backward compatibility export - ADK still expects root_agent in some contexts +root_agent = cache_analysis_agent diff --git a/contributing/samples/cache_analysis/run_cache_experiments.py b/contributing/samples/cache_analysis/run_cache_experiments.py new file mode 100644 index 00000000..7022f446 --- /dev/null +++ b/contributing/samples/cache_analysis/run_cache_experiments.py @@ -0,0 +1,703 @@ +#!/usr/bin/env python3 +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cache Performance Experiments for ADK Context Caching + +This script runs two experiments to compare caching performance: +A. Gemini 2.0 Flash: Cache enabled vs disabled (explicit caching test) +B. Gemini 2.5 Flash: Implicit vs explicit caching comparison +""" + +import argparse +import asyncio +import copy +import json +import sys +import time +from typing import Any +from typing import Dict +from typing import List + +try: + # Try relative imports first (when run as module) + from .agent import app + from .utils import get_test_prompts + from .utils import run_experiment_batch +except ImportError: + # Fallback to direct imports (when run as script) + from agent import app + from utils import get_test_prompts + from utils import run_experiment_batch + +from google.adk.runners import InMemoryRunner +from google.adk.utils.cache_performance_analyzer import CachePerformanceAnalyzer + +APP_NAME = "cache_analysis_experiments" +USER_ID = "cache_researcher" + + +def create_agent_variant(base_app, model_name: str, cache_enabled: bool): + """Create an app variant with specified model and cache settings.""" + import datetime + + from google.adk.agents.context_cache_config import ContextCacheConfig + from google.adk.apps.app import App + + # Extract the root agent and modify its model + agent_copy = copy.deepcopy(base_app.root_agent) + agent_copy.model = model_name + + # Prepend dynamic timestamp to instruction to avoid implicit cache reuse across runs + current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + dynamic_prefix = f"Current session started at: {current_timestamp}\n\n" + agent_copy.instruction = dynamic_prefix + agent_copy.instruction + + # Update agent name to reflect configuration + cache_status = "cached" if cache_enabled else "no_cache" + agent_copy.name = ( + f"cache_analysis_{model_name.replace('.', '_').replace('-', '_')}_{cache_status}" + ) + + if cache_enabled: + # Use standardized cache config + cache_config = ContextCacheConfig( + min_tokens=4096, + ttl_seconds=600, # 10 mins for research sessions + cache_intervals=3, # Maximum invocations before cache refresh + ) + else: + # Disable caching by setting config to None + cache_config = None + + # Create new App with updated configuration + app_copy = App( + name=f"{base_app.name}_{cache_status}", + root_agent=agent_copy, + context_cache_config=cache_config, + ) + + return app_copy + + +async def run_cache_comparison_experiment( + model_name: str, + description: str, + cached_label: str, + uncached_label: str, + experiment_title: str, + reverse_order: bool = False, + request_delay: float = 2.0, +) -> Dict[str, Any]: + """ + Run a cache performance comparison experiment for a specific model. + + Args: + model_name: Model to test (e.g., "gemini-2.0-flash", "gemini-2.5-flash") + description: Description of what the experiment tests + cached_label: Label for the cached experiment variant + uncached_label: Label for the uncached experiment variant + experiment_title: Title to display for the experiment + + Returns: + Dictionary containing experiment results and performance comparison + """ + print("=" * 80) + print(f"EXPERIMENT {model_name}: {experiment_title}") + print("=" * 80) + print(f"Testing: {description}") + print(f"Model: {model_name}") + print() + + # Create app variants + app_cached = create_agent_variant(app, model_name, cache_enabled=True) + app_uncached = create_agent_variant(app, model_name, cache_enabled=False) + + # Get test prompts + prompts = get_test_prompts() + + # Create runners + runner_cached = InMemoryRunner(app=app_cached, app_name=None) + runner_uncached = InMemoryRunner(app=app_uncached, app_name=None) + + # Create sessions for each experiment to avoid cross-contamination + session_cached = await runner_cached.session_service.create_session( + app_name=runner_cached.app_name, user_id=USER_ID + ) + session_uncached = await runner_uncached.session_service.create_session( + app_name=runner_uncached.app_name, user_id=USER_ID + ) + + if not reverse_order: # Default: uncached first + print("▢️ Running experiments in DEFAULT ORDER (uncached first)") + print() + + # Test uncached version first + results_uncached = await run_experiment_batch( + app_uncached.root_agent.name, + runner_uncached, + USER_ID, + session_uncached.id, + prompts, + f"Experiment {model_name} - {uncached_label}", + request_delay=request_delay, + ) + + # Brief pause between experiments + await asyncio.sleep(5) + + # Test cached version second + results_cached = await run_experiment_batch( + app_cached.root_agent.name, + runner_cached, + USER_ID, + session_cached.id, + prompts, + f"Experiment {model_name} - {cached_label}", + request_delay=request_delay, + ) + else: + print("πŸ”„ Running experiments in ALTERNATE ORDER (cached first)") + print() + + # Test cached version first + results_cached = await run_experiment_batch( + app_cached.root_agent.name, + runner_cached, + USER_ID, + session_cached.id, + prompts, + f"Experiment {model_name} - {cached_label}", + request_delay=request_delay, + ) + + # Brief pause between experiments + await asyncio.sleep(5) + + # Test uncached version second + results_uncached = await run_experiment_batch( + app_uncached.root_agent.name, + runner_uncached, + USER_ID, + session_uncached.id, + prompts, + f"Experiment {model_name} - {uncached_label}", + request_delay=request_delay, + ) + + # Analyze cache performance using CachePerformanceAnalyzer + performance_analysis = await analyze_cache_performance_from_sessions( + runner_cached, + session_cached, + runner_uncached, + session_uncached, + model_name, + ) + + # Extract metrics from analyzer for backward compatibility + cached_analysis = performance_analysis.get("cached_analysis", {}) + uncached_analysis = performance_analysis.get("uncached_analysis", {}) + + cached_total_prompt_tokens = cached_analysis.get("total_prompt_tokens", 0) + cached_total_cached_tokens = cached_analysis.get("total_cached_tokens", 0) + cached_cache_hit_ratio = cached_analysis.get("cache_hit_ratio_percent", 0.0) + cached_cache_utilization_ratio = cached_analysis.get( + "cache_utilization_ratio_percent", 0.0 + ) + cached_avg_cached_tokens_per_request = cached_analysis.get( + "avg_cached_tokens_per_request", 0.0 + ) + cached_requests_with_hits = cached_analysis.get("requests_with_cache_hits", 0) + total_cached_requests = cached_analysis.get("total_requests", 0) + + uncached_total_prompt_tokens = uncached_analysis.get("total_prompt_tokens", 0) + uncached_total_cached_tokens = uncached_analysis.get("total_cached_tokens", 0) + uncached_cache_hit_ratio = uncached_analysis.get( + "cache_hit_ratio_percent", 0.0 + ) + uncached_cache_utilization_ratio = uncached_analysis.get( + "cache_utilization_ratio_percent", 0.0 + ) + uncached_avg_cached_tokens_per_request = uncached_analysis.get( + "avg_cached_tokens_per_request", 0.0 + ) + uncached_requests_with_hits = uncached_analysis.get( + "requests_with_cache_hits", 0 + ) + total_uncached_requests = uncached_analysis.get("total_requests", 0) + + summary = { + "experiment": model_name, + "description": description, + "model": model_name, + "cached_results": results_cached, + "uncached_results": results_uncached, + "cache_analysis": { + "cached_experiment": { + "cache_hit_ratio_percent": cached_cache_hit_ratio, + "cache_utilization_ratio_percent": cached_cache_utilization_ratio, + "total_prompt_tokens": cached_total_prompt_tokens, + "total_cached_tokens": cached_total_cached_tokens, + "avg_cached_tokens_per_request": ( + cached_avg_cached_tokens_per_request + ), + "requests_with_cache_hits": cached_requests_with_hits, + "total_requests": total_cached_requests, + }, + "uncached_experiment": { + "cache_hit_ratio_percent": uncached_cache_hit_ratio, + "cache_utilization_ratio_percent": ( + uncached_cache_utilization_ratio + ), + "total_prompt_tokens": uncached_total_prompt_tokens, + "total_cached_tokens": uncached_total_cached_tokens, + "avg_cached_tokens_per_request": ( + uncached_avg_cached_tokens_per_request + ), + "requests_with_cache_hits": uncached_requests_with_hits, + "total_requests": total_uncached_requests, + }, + }, + } + + print(f"πŸ“Š EXPERIMENT {model_name} CACHE ANALYSIS:") + print(f" πŸ”₯ {cached_label}:") + print( + f" Cache Hit Ratio: {cached_cache_hit_ratio:.1f}%" + f" ({cached_total_cached_tokens:,} /" + f" {cached_total_prompt_tokens:,} tokens)" + ) + print( + f" Cache Utilization: {cached_cache_utilization_ratio:.1f}%" + f" ({cached_requests_with_hits}/{total_cached_requests} requests)" + ) + print( + " Avg Cached Tokens/Request:" + f" {cached_avg_cached_tokens_per_request:.0f}" + ) + print(f" ❄️ {uncached_label}:") + print( + f" Cache Hit Ratio: {uncached_cache_hit_ratio:.1f}%" + f" ({uncached_total_cached_tokens:,} /" + f" {uncached_total_prompt_tokens:,} tokens)" + ) + print( + f" Cache Utilization: {uncached_cache_utilization_ratio:.1f}%" + f" ({uncached_requests_with_hits}/{total_uncached_requests} requests)" + ) + print( + " Avg Cached Tokens/Request:" + f" {uncached_avg_cached_tokens_per_request:.0f}" + ) + print() + + # Add performance analysis to summary + summary["performance_analysis"] = performance_analysis + + return summary + + +async def analyze_cache_performance_from_sessions( + runner_cached, + session_cached, + runner_uncached, + session_uncached, + model_name: str, +) -> Dict[str, Any]: + """Analyze cache performance using CachePerformanceAnalyzer.""" + print("πŸ“Š ANALYZING CACHE PERFORMANCE WITH CachePerformanceAnalyzer...") + + analyzer_cached = CachePerformanceAnalyzer(runner_cached.session_service) + analyzer_uncached = CachePerformanceAnalyzer(runner_uncached.session_service) + + # Analyze cached experiment + try: + cached_analysis = await analyzer_cached.analyze_agent_cache_performance( + session_cached.id, + USER_ID, + runner_cached.app_name, + f"cache_analysis_{model_name.replace('.', '_').replace('-', '_')}_cached", + ) + print(f" πŸ”₯ Cached Experiment Analysis:") + print(f" Status: {cached_analysis['status']}") + if cached_analysis["status"] == "active": + print( + " Cache Hit Ratio:" + f" {cached_analysis['cache_hit_ratio_percent']:.1f}%" + f" ({cached_analysis['total_cached_tokens']:,} /" + f" {cached_analysis['total_prompt_tokens']:,} tokens)" + ) + print( + " Cache Utilization:" + f" {cached_analysis['cache_utilization_ratio_percent']:.1f}%" + f" ({cached_analysis['requests_with_cache_hits']}/{cached_analysis['total_requests']} requests)" + ) + print( + " Avg Cached Tokens/Request:" + f" {cached_analysis['avg_cached_tokens_per_request']:.0f}" + ) + print( + f" Requests with cache: {cached_analysis['requests_with_cache']}" + ) + print( + " Avg invocations used:" + f" {cached_analysis['avg_invocations_used']:.1f}" + ) + print(f" Cache refreshes: {cached_analysis['cache_refreshes']}") + print(f" Total invocations: {cached_analysis['total_invocations']}") + except Exception as e: + print(f" ❌ Error analyzing cached experiment: {e}") + cached_analysis = {"status": "error", "error": str(e)} + + # Analyze uncached experiment + try: + uncached_analysis = await analyzer_uncached.analyze_agent_cache_performance( + session_uncached.id, + USER_ID, + runner_uncached.app_name, + f"cache_analysis_{model_name.replace('.', '_').replace('-', '_')}_no_cache", + ) + print(f" ❄️ Uncached Experiment Analysis:") + print(f" Status: {uncached_analysis['status']}") + if uncached_analysis["status"] == "active": + print( + " Cache Hit Ratio:" + f" {uncached_analysis['cache_hit_ratio_percent']:.1f}%" + f" ({uncached_analysis['total_cached_tokens']:,} /" + f" {uncached_analysis['total_prompt_tokens']:,} tokens)" + ) + print( + " Cache Utilization:" + f" {uncached_analysis['cache_utilization_ratio_percent']:.1f}%" + f" ({uncached_analysis['requests_with_cache_hits']}/{uncached_analysis['total_requests']} requests)" + ) + print( + " Avg Cached Tokens/Request:" + f" {uncached_analysis['avg_cached_tokens_per_request']:.0f}" + ) + print( + " Requests with cache:" + f" {uncached_analysis['requests_with_cache']}" + ) + print( + " Avg invocations used:" + f" {uncached_analysis['avg_invocations_used']:.1f}" + ) + print(f" Cache refreshes: {uncached_analysis['cache_refreshes']}") + print(f" Total invocations: {uncached_analysis['total_invocations']}") + except Exception as e: + print(f" ❌ Error analyzing uncached experiment: {e}") + uncached_analysis = {"status": "error", "error": str(e)} + + print() + + return { + "cached_analysis": cached_analysis, + "uncached_analysis": uncached_analysis, + } + + +def get_experiment_labels(model_name: str) -> Dict[str, str]: + """Get experiment labels and titles for a given model.""" + # Determine experiment type based on model name + if "2.5" in model_name: + # Gemini 2.5 models have implicit caching + return { + "description": "Google implicit caching vs ADK explicit caching", + "cached_label": "Explicit Caching", + "uncached_label": "Implicit Caching", + "experiment_title": "Implicit vs Explicit Caching", + } + else: + # Other models (2.0, etc.) test explicit caching vs no caching + return { + "description": "ADK explicit caching enabled vs disabled", + "cached_label": "Cached", + "uncached_label": "Uncached", + "experiment_title": "Cache Performance Comparison", + } + + +def calculate_averaged_results( + all_results: List[Dict[str, Any]], model_name: str +) -> Dict[str, Any]: + """Calculate averaged results from multiple experiment runs.""" + if not all_results: + raise ValueError("No results to average") + + # Calculate average cache metrics + cache_hit_ratios = [ + r["cache_analysis"]["cache_hit_ratio_percent"] for r in all_results + ] + cache_utilization_ratios = [ + r["cache_analysis"]["cache_utilization_ratio_percent"] + for r in all_results + ] + total_prompt_tokens = [ + r["cache_analysis"]["total_prompt_tokens"] for r in all_results + ] + total_cached_tokens = [ + r["cache_analysis"]["total_cached_tokens"] for r in all_results + ] + avg_cached_tokens_per_request = [ + r["cache_analysis"]["avg_cached_tokens_per_request"] for r in all_results + ] + requests_with_cache_hits = [ + r["cache_analysis"]["requests_with_cache_hits"] for r in all_results + ] + + def safe_average(values): + """Calculate average, handling empty lists.""" + return sum(values) / len(values) if values else 0.0 + + # Create averaged result + averaged_result = { + "experiment": model_name, + "description": all_results[0]["description"], + "model": model_name, + "individual_runs": ( + all_results + ), # Keep all individual results for reference + "averaged_cache_analysis": { + "cache_hit_ratio_percent": safe_average(cache_hit_ratios), + "cache_utilization_ratio_percent": safe_average( + cache_utilization_ratios + ), + "total_prompt_tokens": safe_average(total_prompt_tokens), + "total_cached_tokens": safe_average(total_cached_tokens), + "avg_cached_tokens_per_request": safe_average( + avg_cached_tokens_per_request + ), + "requests_with_cache_hits": safe_average(requests_with_cache_hits), + }, + "statistics": { + "runs_completed": len(all_results), + "cache_hit_ratio_std": _calculate_std(cache_hit_ratios), + "cache_utilization_std": _calculate_std(cache_utilization_ratios), + "cached_tokens_per_request_std": _calculate_std( + avg_cached_tokens_per_request + ), + }, + } + + # Print averaged results + print("\nπŸ“Š AVERAGED CACHE ANALYSIS RESULTS:") + print("=" * 80) + avg_cache = averaged_result["averaged_cache_analysis"] + stats = averaged_result["statistics"] + + print(f" Runs completed: {stats['runs_completed']}") + print( + f" Average Cache Hit Ratio: {avg_cache['cache_hit_ratio_percent']:.1f}%" + f" (Β±{stats['cache_hit_ratio_std']:.1f}%)" + ) + print( + " Average Cache Utilization:" + f" {avg_cache['cache_utilization_ratio_percent']:.1f}%" + f" (Β±{stats['cache_utilization_std']:.1f}%)" + ) + print( + " Average Cached Tokens/Request:" + f" {avg_cache['avg_cached_tokens_per_request']:.0f}" + f" (Β±{stats['cached_tokens_per_request_std']:.0f})" + ) + print() + + return averaged_result + + +def _calculate_std(values): + """Calculate standard deviation.""" + if len(values) <= 1: + return 0.0 + mean = sum(values) / len(values) + variance = sum((x - mean) ** 2 for x in values) / len(values) + return variance**0.5 + + +def save_results(results: Dict[str, Any], filename: str): + """Save experiment results to JSON file.""" + with open(filename, "w") as f: + json.dump(results, f, indent=2) + print(f"πŸ’Ύ Results saved to: {filename}") + + +async def main(): + """Run cache performance experiment for a specific model.""" + parser = argparse.ArgumentParser( + description="ADK Cache Performance Experiment" + ) + parser.add_argument( + "model", + help="Model to test (e.g., gemini-2.5-flash, gemini-2.0-flash-001)", + ) + parser.add_argument( + "--output", + help="Output filename for results (default: cache_{model}_results.json)", + ) + parser.add_argument( + "--repeat", + type=int, + default=1, + help=( + "Number of times to repeat each experiment for averaged results" + " (default: 1)" + ), + ) + parser.add_argument( + "--cached-first", + action="store_true", + help="Run cached experiment first (default: uncached first)", + ) + parser.add_argument( + "--request-delay", + type=float, + default=2.0, + help=( + "Delay in seconds between API requests to avoid overloading (default:" + " 2.0)" + ), + ) + + args = parser.parse_args() + + # Set default output filename based on model + if not args.output: + args.output = ( + f"cache_{args.model.replace('.', '_').replace('-', '_')}_results.json" + ) + + print("πŸ§ͺ ADK CONTEXT CACHE PERFORMANCE EXPERIMENT") + print("=" * 80) + print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Model: {args.model}") + print(f"Repetitions: {args.repeat}") + print() + + start_time = time.time() + + try: + # Get experiment labels for the model + labels = get_experiment_labels(args.model) + + # Run the experiment multiple times if repeat > 1 + if args.repeat == 1: + # Single run + result = await run_cache_comparison_experiment( + model_name=args.model, + reverse_order=args.cached_first, + request_delay=args.request_delay, + **labels, + ) + else: + # Multiple runs with averaging + print(f"πŸ”„ Running experiment {args.repeat} times for averaged results") + print("=" * 80) + + all_results = [] + for run_num in range(args.repeat): + print(f"\nπŸƒ RUN {run_num + 1}/{args.repeat}") + print("-" * 40) + + run_result = await run_cache_comparison_experiment( + model_name=args.model, + reverse_order=args.cached_first, + request_delay=args.request_delay, + **labels, + ) + all_results.append(run_result) + + # Brief pause between runs + if run_num < args.repeat - 1: + print("⏸️ Pausing 10 seconds between runs...") + await asyncio.sleep(10) + + # Calculate averaged results + result = calculate_averaged_results(all_results, args.model) + + # Add completion metadata + result["end_time"] = time.strftime("%Y-%m-%d %H:%M:%S") + result["total_duration"] = time.time() - start_time + result["repetitions"] = args.repeat + + except KeyboardInterrupt: + print("\n⚠️ Experiment interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n❌ Experiment failed: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) + + # Save results + save_results(result, args.output) + + # Print final summary + print("=" * 80) + print("πŸŽ‰ EXPERIMENT COMPLETED SUCCESSFULLY!") + print("=" * 80) + + # Handle both single and averaged results + if args.repeat == 1: + cached_exp = result["cache_analysis"]["cached_experiment"] + uncached_exp = result["cache_analysis"]["uncached_experiment"] + labels = get_experiment_labels(args.model) + print(f"{args.model}:") + print(f" πŸ”₯ {labels['cached_label']}:") + print(f" Cache Hit Ratio: {cached_exp['cache_hit_ratio_percent']:.1f}%") + print( + " Cache Utilization:" + f" {cached_exp['cache_utilization_ratio_percent']:.1f}%" + ) + print( + " Cached Tokens/Request:" + f" {cached_exp['avg_cached_tokens_per_request']:.0f}" + ) + print(f" ❄️ {labels['uncached_label']}:") + print( + f" Cache Hit Ratio: {uncached_exp['cache_hit_ratio_percent']:.1f}%" + ) + print( + " Cache Utilization:" + f" {uncached_exp['cache_utilization_ratio_percent']:.1f}%" + ) + print( + " Cached Tokens/Request:" + f" {uncached_exp['avg_cached_tokens_per_request']:.0f}" + ) + else: + # For averaged results, show summary comparison + cached_exp = result["averaged_cache_analysis"]["cached_experiment"] + uncached_exp = result["averaged_cache_analysis"]["uncached_experiment"] + labels = get_experiment_labels(args.model) + print(f"{args.model} (averaged over {args.repeat} runs):") + print(f" πŸ”₯ {labels['cached_label']} vs ❄️ {labels['uncached_label']}:") + print( + f" Cache Hit Ratio: {cached_exp['cache_hit_ratio_percent']:.1f}% vs" + f" {uncached_exp['cache_hit_ratio_percent']:.1f}%" + ) + print( + " Cache Utilization:" + f" {cached_exp['cache_utilization_ratio_percent']:.1f}% vs" + f" {uncached_exp['cache_utilization_ratio_percent']:.1f}%" + ) + + print(f"\nTotal execution time: {result['total_duration']:.2f} seconds") + print(f"Results saved to: {args.output}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/contributing/samples/cache_analysis/utils.py b/contributing/samples/cache_analysis/utils.py new file mode 100644 index 00000000..e2c9f891 --- /dev/null +++ b/contributing/samples/cache_analysis/utils.py @@ -0,0 +1,272 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions for cache analysis experiments.""" + +import asyncio +import time +from typing import Any +from typing import Dict +from typing import List + +from google.adk.runners import InMemoryRunner + + +async def call_agent_async( + runner: InMemoryRunner, user_id: str, session_id: str, prompt: str +) -> Dict[str, Any]: + """Call agent asynchronously and return response with token usage.""" + from google.genai import types + + response_parts = [] + token_usage = { + "prompt_token_count": 0, + "candidates_token_count": 0, + "cached_content_token_count": 0, + "total_token_count": 0, + } + + async for event in runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=types.Content(parts=[types.Part(text=prompt)], role="user"), + ): + if event.content and event.content.parts: + for part in event.content.parts: + if hasattr(part, "text") and part.text: + response_parts.append(part.text) + + # Collect token usage information + if event.usage_metadata: + if ( + hasattr(event.usage_metadata, "prompt_token_count") + and event.usage_metadata.prompt_token_count + ): + token_usage[ + "prompt_token_count" + ] += event.usage_metadata.prompt_token_count + if ( + hasattr(event.usage_metadata, "candidates_token_count") + and event.usage_metadata.candidates_token_count + ): + token_usage[ + "candidates_token_count" + ] += event.usage_metadata.candidates_token_count + if ( + hasattr(event.usage_metadata, "cached_content_token_count") + and event.usage_metadata.cached_content_token_count + ): + token_usage[ + "cached_content_token_count" + ] += event.usage_metadata.cached_content_token_count + if ( + hasattr(event.usage_metadata, "total_token_count") + and event.usage_metadata.total_token_count + ): + token_usage[ + "total_token_count" + ] += event.usage_metadata.total_token_count + + response_text = "".join(response_parts) + + return {"response_text": response_text, "token_usage": token_usage} + + +def get_test_prompts() -> List[str]: + """Get a standardized set of test prompts for cache analysis experiments. + + Designed for consistent behavior: + - Prompts 1-5: Will NOT trigger function calls (general questions) + - Prompts 6-10: Will trigger function calls (specific tool requests) + """ + return [ + # === PROMPTS THAT WILL NOT TRIGGER FUNCTION CALLS === + # (General questions that don't match specific tool descriptions) + "Hello, what can you do for me?", + ( + "What is artificial intelligence and how does it work in modern" + " applications?" + ), + "Explain the difference between machine learning and deep learning.", + "What are the main challenges in implementing AI systems at scale?", + "How do recommendation systems work in modern e-commerce platforms?", + # === PROMPTS THAT WILL TRIGGER FUNCTION CALLS === + # (Specific requests with all required parameters clearly specified) + ( + "Use benchmark_performance with system_name='E-commerce Platform'," + " metrics=['latency', 'throughput'], duration='standard'," + " load_profile='realistic'." + ), + ( + "Call analyze_user_behavior_patterns with" + " user_segment='premium_customers', time_period='last_30_days'," + " metrics=['engagement', 'conversion']." + ), + ( + "Run market_research_analysis for industry='fintech'," + " focus_areas=['user_experience', 'security']," + " report_depth='comprehensive'." + ), + ( + "Execute competitive_analysis with competitors=['Netflix'," + " 'Disney+'], analysis_type='feature_comparison'," + " output_format='detailed'." + ), + ( + "Perform content_performance_evaluation on content_type='video'," + " platform='social_media', success_metrics=['views', 'engagement']." + ), + ] + + +async def run_experiment_batch( + agent_name: str, + runner: InMemoryRunner, + user_id: str, + session_id: str, + prompts: List[str], + experiment_name: str, + request_delay: float = 2.0, +) -> Dict[str, Any]: + """Run a batch of prompts and collect cache metrics.""" + results = [] + + print(f"πŸ§ͺ Running {experiment_name}") + print(f"Agent: {agent_name}") + print(f"Session: {session_id}") + print(f"Prompts: {len(prompts)}") + print(f"Request delay: {request_delay}s between calls") + print("-" * 60) + + for i, prompt in enumerate(prompts, 1): + print(f"[{i}/{len(prompts)}] Running test prompt...") + print(f"Prompt: {prompt[:100]}...") + + try: + agent_response = await call_agent_async( + runner, user_id, session_id, prompt + ) + + result = { + "prompt_number": i, + "prompt": prompt, + "response_length": len(agent_response["response_text"]), + "success": True, + "error": None, + "token_usage": agent_response["token_usage"], + } + + # Extract token usage for individual prompt statistics + prompt_tokens = agent_response["token_usage"].get("prompt_token_count", 0) + cached_tokens = agent_response["token_usage"].get( + "cached_content_token_count", 0 + ) + + print( + "βœ… Completed (Response:" + f" {len(agent_response['response_text'])} chars)" + ) + print( + f" πŸ“Š Tokens - Prompt: {prompt_tokens:,}, Cached: {cached_tokens:,}" + ) + + except Exception as e: + result = { + "prompt_number": i, + "prompt": prompt, + "response_length": 0, + "success": False, + "error": str(e), + "token_usage": { + "prompt_token_count": 0, + "candidates_token_count": 0, + "cached_content_token_count": 0, + "total_token_count": 0, + }, + } + + print(f"❌ Failed: {e}") + + results.append(result) + + # Configurable pause between requests to avoid API overload + if i < len(prompts): # Don't sleep after the last request + print(f" ⏸️ Waiting {request_delay}s before next request...") + await asyncio.sleep(request_delay) + + successful_requests = sum(1 for r in results if r["success"]) + + # Calculate cache statistics for this batch + total_prompt_tokens = sum( + r.get("token_usage", {}).get("prompt_token_count", 0) for r in results + ) + total_cached_tokens = sum( + r.get("token_usage", {}).get("cached_content_token_count", 0) + for r in results + ) + + # Calculate cache hit ratio + if total_prompt_tokens > 0: + cache_hit_ratio = (total_cached_tokens / total_prompt_tokens) * 100 + else: + cache_hit_ratio = 0.0 + + # Calculate cache utilization + requests_with_cache_hits = sum( + 1 + for r in results + if r.get("token_usage", {}).get("cached_content_token_count", 0) > 0 + ) + cache_utilization_ratio = ( + (requests_with_cache_hits / len(prompts)) * 100 if prompts else 0.0 + ) + + # Average cached tokens per request + avg_cached_tokens_per_request = ( + total_cached_tokens / len(prompts) if prompts else 0.0 + ) + + summary = { + "experiment_name": experiment_name, + "agent_name": agent_name, + "total_requests": len(prompts), + "successful_requests": successful_requests, + "results": results, + "cache_statistics": { + "cache_hit_ratio_percent": cache_hit_ratio, + "cache_utilization_ratio_percent": cache_utilization_ratio, + "total_prompt_tokens": total_prompt_tokens, + "total_cached_tokens": total_cached_tokens, + "avg_cached_tokens_per_request": avg_cached_tokens_per_request, + "requests_with_cache_hits": requests_with_cache_hits, + }, + } + + print("-" * 60) + print(f"βœ… {experiment_name} completed:") + print(f" Total requests: {len(prompts)}") + print(f" Successful: {successful_requests}/{len(prompts)}") + print(" πŸ“Š BATCH CACHE STATISTICS:") + print( + f" Cache Hit Ratio: {cache_hit_ratio:.1f}%" + f" ({total_cached_tokens:,} / {total_prompt_tokens:,} tokens)" + ) + print( + f" Cache Utilization: {cache_utilization_ratio:.1f}%" + f" ({requests_with_cache_hits}/{len(prompts)} requests)" + ) + print(f" Avg Cached Tokens/Request: {avg_cached_tokens_per_request:.0f}") + print() + + return summary