""" Agent Performance Metrics Service. Collects and exposes metrics for agent performance monitoring. """ import time import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass, field from collections import defaultdict, deque import statistics from prometheus_client import ( Counter, Histogram, Gauge, Summary, CollectorRegistry, generate_latest ) from src.core import get_logger from src.services.cache_service import cache_result logger = get_logger("agent.metrics") # Prometheus metrics registry registry = CollectorRegistry() # Agent metrics agent_requests_total = Counter( 'agent_requests_total', 'Total number of agent requests', ['agent_name', 'action', 'status'], registry=registry ) agent_request_duration = Histogram( 'agent_request_duration_seconds', 'Agent request duration in seconds', ['agent_name', 'action'], buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0), registry=registry ) agent_active_requests = Gauge( 'agent_active_requests', 'Number of active agent requests', ['agent_name'], registry=registry ) agent_error_rate = Gauge( 'agent_error_rate', 'Agent error rate (last 5 minutes)', ['agent_name'], registry=registry ) agent_memory_usage = Gauge( 'agent_memory_usage_bytes', 'Agent memory usage in bytes', ['agent_name'], registry=registry ) agent_reflection_iterations = Histogram( 'agent_reflection_iterations', 'Number of reflection iterations per request', ['agent_name'], buckets=(0, 1, 2, 3, 4, 5, 10), registry=registry ) agent_quality_score = Histogram( 'agent_quality_score', 'Agent response quality score', ['agent_name', 'action'], buckets=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0), registry=registry ) @dataclass class AgentMetrics: """Detailed metrics for a specific agent.""" agent_name: str total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 total_duration_seconds: float = 0.0 response_times: deque = field(default_factory=lambda: deque(maxlen=1000)) error_times: deque = field(default_factory=lambda: deque(maxlen=1000)) actions_count: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) last_error: Optional[str] = None last_success_time: Optional[datetime] = None last_failure_time: Optional[datetime] = None quality_scores: deque = field(default_factory=lambda: deque(maxlen=100)) reflection_counts: deque = field(default_factory=lambda: deque(maxlen=100)) memory_samples: deque = field(default_factory=lambda: deque(maxlen=60)) class AgentMetricsService: """Service for collecting and managing agent performance metrics.""" def __init__(self): self.logger = logger self._agent_metrics: Dict[str, AgentMetrics] = {} self._start_time = datetime.utcnow() self._lock = asyncio.Lock() def _get_or_create_metrics(self, agent_name: str) -> AgentMetrics: """Get or create metrics for an agent.""" if agent_name not in self._agent_metrics: self._agent_metrics[agent_name] = AgentMetrics(agent_name=agent_name) return self._agent_metrics[agent_name] async def record_request_start(self, agent_name: str, action: str) -> str: """Record the start of an agent request.""" request_id = f"{agent_name}_{action}_{time.time()}" # Increment active requests agent_active_requests.labels(agent_name=agent_name).inc() return request_id async def record_request_end( self, request_id: str, agent_name: str, action: str, duration: float, success: bool, error: Optional[str] = None, quality_score: Optional[float] = None, reflection_iterations: int = 0 ): """Record the end of an agent request.""" async with self._lock: metrics = self._get_or_create_metrics(agent_name) # Update counters metrics.total_requests += 1 if success: metrics.successful_requests += 1 metrics.last_success_time = datetime.utcnow() status = "success" else: metrics.failed_requests += 1 metrics.last_failure_time = datetime.utcnow() metrics.last_error = error metrics.error_times.append(datetime.utcnow()) status = "failure" # Update duration metrics metrics.total_duration_seconds += duration metrics.response_times.append(duration) # Update action count metrics.actions_count[action] += 1 # Update quality metrics if quality_score is not None: metrics.quality_scores.append(quality_score) agent_quality_score.labels( agent_name=agent_name, action=action ).observe(quality_score) # Update reflection metrics metrics.reflection_counts.append(reflection_iterations) agent_reflection_iterations.labels(agent_name=agent_name).observe(reflection_iterations) # Update Prometheus metrics agent_requests_total.labels( agent_name=agent_name, action=action, status=status ).inc() agent_request_duration.labels( agent_name=agent_name, action=action ).observe(duration) # Decrement active requests agent_active_requests.labels(agent_name=agent_name).dec() # Update error rate (last 5 minutes) error_rate = self._calculate_error_rate(metrics) agent_error_rate.labels(agent_name=agent_name).set(error_rate) def _calculate_error_rate(self, metrics: AgentMetrics) -> float: """Calculate error rate for the last 5 minutes.""" cutoff_time = datetime.utcnow() - timedelta(minutes=5) recent_errors = sum(1 for t in metrics.error_times if t > cutoff_time) # Calculate total requests in the same period if metrics.total_requests == 0: return 0.0 # Estimate requests in window (simplified) window_ratio = min(1.0, 300 / metrics.total_duration_seconds) # 5 minutes estimated_requests = max(1, int(metrics.total_requests * window_ratio)) return min(1.0, recent_errors / estimated_requests) async def record_memory_usage(self, agent_name: str, memory_bytes: int): """Record agent memory usage.""" async with self._lock: metrics = self._get_or_create_metrics(agent_name) metrics.memory_samples.append(memory_bytes) # Update Prometheus metric agent_memory_usage.labels(agent_name=agent_name).set(memory_bytes) @cache_result(prefix="agent_stats", ttl=30) async def get_agent_stats(self, agent_name: str) -> Dict[str, Any]: """Get comprehensive stats for a specific agent.""" async with self._lock: metrics = self._agent_metrics.get(agent_name) if not metrics: return { "agent_name": agent_name, "status": "no_data" } response_times = list(metrics.response_times) quality_scores = list(metrics.quality_scores) reflection_counts = list(metrics.reflection_counts) return { "agent_name": agent_name, "total_requests": metrics.total_requests, "successful_requests": metrics.successful_requests, "failed_requests": metrics.failed_requests, "success_rate": metrics.successful_requests / metrics.total_requests if metrics.total_requests > 0 else 0, "error_rate": self._calculate_error_rate(metrics), "response_time": { "mean": statistics.mean(response_times) if response_times else 0, "median": statistics.median(response_times) if response_times else 0, "p95": self._percentile(response_times, 95) if response_times else 0, "p99": self._percentile(response_times, 99) if response_times else 0, "min": min(response_times) if response_times else 0, "max": max(response_times) if response_times else 0 }, "quality": { "mean": statistics.mean(quality_scores) if quality_scores else 0, "median": statistics.median(quality_scores) if quality_scores else 0, "min": min(quality_scores) if quality_scores else 0, "max": max(quality_scores) if quality_scores else 0 }, "reflection": { "mean_iterations": statistics.mean(reflection_counts) if reflection_counts else 0, "max_iterations": max(reflection_counts) if reflection_counts else 0 }, "actions": dict(metrics.actions_count), "last_error": metrics.last_error, "last_success_time": metrics.last_success_time.isoformat() if metrics.last_success_time else None, "last_failure_time": metrics.last_failure_time.isoformat() if metrics.last_failure_time else None, "memory_usage": { "current": metrics.memory_samples[-1] if metrics.memory_samples else 0, "mean": statistics.mean(metrics.memory_samples) if metrics.memory_samples else 0, "max": max(metrics.memory_samples) if metrics.memory_samples else 0 } } async def get_all_agents_summary(self) -> Dict[str, Any]: """Get summary stats for all agents.""" async with self._lock: summary = { "total_agents": len(self._agent_metrics), "total_requests": sum(m.total_requests for m in self._agent_metrics.values()), "total_successful": sum(m.successful_requests for m in self._agent_metrics.values()), "total_failed": sum(m.failed_requests for m in self._agent_metrics.values()), "uptime_seconds": (datetime.utcnow() - self._start_time).total_seconds(), "agents": {} } for agent_name, metrics in self._agent_metrics.items(): response_times = list(metrics.response_times) summary["agents"][agent_name] = { "requests": metrics.total_requests, "success_rate": metrics.successful_requests / metrics.total_requests if metrics.total_requests > 0 else 0, "avg_response_time": statistics.mean(response_times) if response_times else 0, "error_rate": self._calculate_error_rate(metrics) } return summary def _percentile(self, data: List[float], percentile: float) -> float: """Calculate percentile of data.""" if not data: return 0 sorted_data = sorted(data) index = int(len(sorted_data) * (percentile / 100)) if index >= len(sorted_data): return sorted_data[-1] return sorted_data[index] def get_prometheus_metrics(self) -> bytes: """Get Prometheus metrics in text format.""" return generate_latest(registry) async def reset_metrics(self, agent_name: Optional[str] = None): """Reset metrics for specific agent or all agents.""" async with self._lock: if agent_name: if agent_name in self._agent_metrics: self._agent_metrics[agent_name] = AgentMetrics(agent_name=agent_name) else: self._agent_metrics.clear() self._start_time = datetime.utcnow() # Global metrics service instance agent_metrics_service = AgentMetricsService() class MetricsCollector: """Context manager for collecting agent metrics.""" def __init__( self, agent_name: str, action: str, metrics_service: Optional[AgentMetricsService] = None ): self.agent_name = agent_name self.action = action self.metrics_service = metrics_service or agent_metrics_service self.start_time = None self.request_id = None self.quality_score = None self.reflection_iterations = 0 async def __aenter__(self): """Start metrics collection.""" self.start_time = time.time() self.request_id = await self.metrics_service.record_request_start( self.agent_name, self.action ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """End metrics collection.""" duration = time.time() - self.start_time success = exc_type is None error = str(exc_val) if exc_val else None await self.metrics_service.record_request_end( request_id=self.request_id, agent_name=self.agent_name, action=self.action, duration=duration, success=success, error=error, quality_score=self.quality_score, reflection_iterations=self.reflection_iterations ) # Don't suppress exceptions return False def set_quality_score(self, score: float): """Set the quality score for the response.""" self.quality_score = score def increment_reflection(self): """Increment reflection iteration count.""" self.reflection_iterations += 1 async def collect_system_metrics(): """Collect system-wide agent metrics periodically.""" while True: try: # Collect memory metrics for active agents # This would integrate with the agent pool to get actual memory usage await asyncio.sleep(60) # Collect every minute except Exception as e: logger.error(f"Error collecting system metrics: {e}") await asyncio.sleep(60)