""" Agent Memory Integration Service Integrates all agents with the Nanã memory system for persistent knowledge sharing and context preservation. """ import asyncio from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Set from enum import Enum import hashlib from src.agents.nana import ( ContextMemoryAgent, EpisodicMemory, SemanticMemory, ConversationMemory, MemoryImportance ) from src.agents.deodoro import AgentMessage, AgentContext, BaseAgent from src.core import get_logger from src.core.exceptions import MemoryError logger = get_logger(__name__) class MemoryIntegrationType(Enum): """Types of memory integration for agents.""" READ_ONLY = "read_only" # Agent can only read memories WRITE_ONLY = "write_only" # Agent can only write memories READ_WRITE = "read_write" # Agent can read and write memories SELECTIVE = "selective" # Agent has selective access based on tags class AgentMemoryIntegration: """ Service to integrate agents with the Nanã memory system. This service acts as a bridge between agents and the memory system, providing: - Automatic memory storage for agent results - Context retrieval for informed decision making - Cross-agent knowledge sharing - Memory-based learning and improvement """ def __init__( self, memory_agent: ContextMemoryAgent, auto_store: bool = True, auto_retrieve: bool = True ): """ Initialize memory integration service. Args: memory_agent: The Nanã memory agent instance auto_store: Automatically store agent results auto_retrieve: Automatically retrieve relevant context """ self.memory_agent = memory_agent self.auto_store = auto_store self.auto_retrieve = auto_retrieve # Agent memory configurations self.agent_configs: Dict[str, Dict[str, Any]] = self._initialize_agent_configs() # Memory access tracking self.access_log: List[Dict[str, Any]] = [] # Cache for frequently accessed memories self.memory_cache: Dict[str, Any] = {} self.cache_ttl = 300 # 5 minutes def _initialize_agent_configs(self) -> Dict[str, Dict[str, Any]]: """Initialize memory configurations for each agent.""" return { # Master agent has full access "abaporu": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["investigation", "coordination", "results"], "importance_threshold": MemoryImportance.LOW, "auto_store_results": True }, # Investigative agents store findings "zumbi": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["anomaly", "fraud", "investigation"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, "anita": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["pattern", "analysis", "trend"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, "oxossi": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["fraud", "evidence", "high_risk"], "importance_threshold": MemoryImportance.HIGH, "auto_store_results": True }, # Reporting agents read memories "tiradentes": { "integration_type": MemoryIntegrationType.READ_ONLY, "tags": ["report", "summary"], "importance_threshold": MemoryImportance.LOW, "auto_store_results": False }, "machado": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["document", "text_analysis", "compliance"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, # Analysis agents "bonifacio": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["policy", "effectiveness", "impact"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, "dandara": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["equity", "social_justice", "inclusion"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, "lampiao": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["regional", "geographic", "inequality"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, # Support agents "ayrton_senna": { "integration_type": MemoryIntegrationType.READ_ONLY, "tags": ["routing", "performance"], "importance_threshold": MemoryImportance.LOW, "auto_store_results": False }, "oscar_niemeyer": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["visualization", "aggregation", "metrics"], "importance_threshold": MemoryImportance.LOW, "auto_store_results": True }, "ceuci": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["prediction", "forecast", "analysis"], "importance_threshold": MemoryImportance.MEDIUM, "auto_store_results": True }, "maria_quiteria": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["security", "audit", "compliance"], "importance_threshold": MemoryImportance.HIGH, "auto_store_results": True }, "obaluaie": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["corruption", "systemic", "alert"], "importance_threshold": MemoryImportance.HIGH, "auto_store_results": True }, "drummond": { "integration_type": MemoryIntegrationType.READ_WRITE, "tags": ["communication", "message", "notification"], "importance_threshold": MemoryImportance.LOW, "auto_store_results": False } } async def integrate_agent(self, agent: BaseAgent) -> None: """ Integrate an agent with the memory system. This wraps the agent's process method to automatically handle memory operations. """ agent_id = agent.agent_id.lower() if agent_id not in self.agent_configs: logger.warning(f"No memory configuration for agent {agent_id}") return # Store original process method original_process = agent.process # Create memory-aware process method async def memory_aware_process(message: AgentMessage, context: AgentContext) -> Any: config = self.agent_configs[agent_id] # Retrieve relevant memories before processing if self.auto_retrieve and config["integration_type"] in [ MemoryIntegrationType.READ_ONLY, MemoryIntegrationType.READ_WRITE, MemoryIntegrationType.SELECTIVE ]: memories = await self.retrieve_relevant_memories( agent_id=agent_id, query=message.content, context=context, tags=config["tags"] ) # Inject memories into context if memories: context.metadata["retrieved_memories"] = memories logger.info(f"Retrieved {len(memories)} memories for {agent_id}") # Process with original method result = await original_process(message, context) # Store result in memory if configured if (self.auto_store and config["auto_store_results"] and config["integration_type"] in [ MemoryIntegrationType.WRITE_ONLY, MemoryIntegrationType.READ_WRITE ] and result.success): # Determine importance based on result importance = self._determine_importance(agent_id, result) if importance.value >= config["importance_threshold"].value: await self.store_agent_result( agent_id=agent_id, message=message, context=context, result=result, importance=importance, tags=config["tags"] ) return result # Replace process method agent.process = memory_aware_process logger.info(f"Successfully integrated {agent_id} with memory system") async def retrieve_relevant_memories( self, agent_id: str, query: str, context: AgentContext, tags: List[str], limit: int = 10 ) -> List[Dict[str, Any]]: """Retrieve relevant memories for an agent.""" try: # Check cache first cache_key = self._generate_cache_key(agent_id, query, tags) if cache_key in self.memory_cache: cached = self.memory_cache[cache_key] if datetime.utcnow() - cached["timestamp"] < timedelta(seconds=self.cache_ttl): return cached["memories"] # Retrieve from memory agent memories = [] # Get episodic memories episodic = await self.memory_agent.retrieve_episodic( investigation_id=context.investigation_id, limit=limit // 2 ) memories.extend(episodic) # Get semantic memories by tags for tag in tags: semantic = await self.memory_agent.retrieve_by_tag( tag=tag, limit=limit // len(tags) ) memories.extend(semantic) # Get similar memories by query similar = await self.memory_agent.retrieve_similar( query=query, limit=limit // 2 ) memories.extend(similar) # Deduplicate and sort by relevance unique_memories = self._deduplicate_memories(memories) sorted_memories = sorted( unique_memories, key=lambda m: m.get("relevance", 0), reverse=True )[:limit] # Cache results self.memory_cache[cache_key] = { "memories": sorted_memories, "timestamp": datetime.utcnow() } # Log access self.access_log.append({ "agent_id": agent_id, "timestamp": datetime.utcnow(), "query": query, "memories_retrieved": len(sorted_memories), "tags": tags }) return sorted_memories except Exception as e: logger.error(f"Error retrieving memories for {agent_id}: {str(e)}") return [] async def store_agent_result( self, agent_id: str, message: AgentMessage, context: AgentContext, result: Any, importance: MemoryImportance, tags: List[str] ) -> bool: """Store agent result in memory.""" try: # Create episodic memory memory_id = f"{agent_id}_{context.investigation_id}_{datetime.utcnow().timestamp()}" episodic_memory = EpisodicMemory( id=memory_id, content={ "agent": agent_id, "message": message.content, "result": result.data if hasattr(result, 'data') else str(result) }, importance=importance, tags=tags + [agent_id], investigation_id=context.investigation_id, user_id=context.user_id, session_id=context.session_id, query=message.content, result=result.data if hasattr(result, 'data') else {"result": str(result)}, context=context.metadata ) # Store in memory agent await self.memory_agent.store_episodic( memory=episodic_memory, context=context ) # Extract and store semantic knowledge if agent_id in ["zumbi", "anita", "oxossi", "bonifacio"]: await self._extract_semantic_knowledge( agent_id=agent_id, result=result, tags=tags, context=context ) logger.info(f"Stored result from {agent_id} with importance {importance.value}") return True except Exception as e: logger.error(f"Error storing result from {agent_id}: {str(e)}") return False async def _extract_semantic_knowledge( self, agent_id: str, result: Any, tags: List[str], context: AgentContext ) -> None: """Extract semantic knowledge from agent results.""" try: knowledge_items = [] # Extract patterns from Anita if agent_id == "anita" and hasattr(result, 'data'): patterns = result.data.get("patterns", []) for pattern in patterns: knowledge_items.append({ "concept": f"pattern_{pattern.get('type', 'unknown')}", "description": pattern.get("description", ""), "confidence": pattern.get("confidence", 0.5), "evidence": [pattern.get("evidence", "")] }) # Extract fraud indicators from Oxossi elif agent_id == "oxossi" and hasattr(result, 'data'): fraud_analysis = result.data.get("fraud_analysis", {}) patterns = fraud_analysis.get("patterns", []) for pattern in patterns: knowledge_items.append({ "concept": f"fraud_{pattern.get('fraud_type', 'unknown')}", "description": f"{pattern.get('fraud_type', 'Unknown')} fraud pattern detected", "confidence": pattern.get("confidence", 0.5), "evidence": [str(ind) for ind in pattern.get("indicators", [])] }) # Extract anomalies from Zumbi elif agent_id == "zumbi" and hasattr(result, 'data'): anomalies = result.data.get("anomalies", []) for anomaly in anomalies: knowledge_items.append({ "concept": f"anomaly_{anomaly.get('type', 'unknown')}", "description": anomaly.get("description", ""), "confidence": anomaly.get("confidence", 0.5), "evidence": [anomaly.get("evidence", "")] }) # Store semantic memories for item in knowledge_items: semantic_memory = SemanticMemory( id=f"semantic_{agent_id}_{item['concept']}_{datetime.utcnow().timestamp()}", content=item, concept=item["concept"], relationships=[agent_id] + tags, evidence=item["evidence"], confidence=item["confidence"], importance=MemoryImportance.MEDIUM, tags=tags + [agent_id, "knowledge"] ) await self.memory_agent.store_semantic( memory=semantic_memory, context=context ) if knowledge_items: logger.info(f"Extracted {len(knowledge_items)} semantic knowledge items from {agent_id}") except Exception as e: logger.error(f"Error extracting semantic knowledge from {agent_id}: {str(e)}") def _determine_importance(self, agent_id: str, result: Any) -> MemoryImportance: """Determine the importance of a result for memory storage.""" # High importance for critical findings if agent_id in ["oxossi", "maria_quiteria", "obaluaie"]: if hasattr(result, 'data'): # Check for high-severity findings if "risk_level" in result.data and result.data["risk_level"] in ["HIGH", "CRITICAL"]: return MemoryImportance.HIGH if "severity" in result.data and result.data["severity"] in ["high", "critical"]: return MemoryImportance.HIGH # Medium importance for analytical findings if agent_id in ["zumbi", "anita", "bonifacio", "dandara"]: if hasattr(result, 'data'): # Check for significant findings if result.data.get("anomalies", []) or result.data.get("patterns", []): return MemoryImportance.MEDIUM # Default to low importance return MemoryImportance.LOW def _generate_cache_key(self, agent_id: str, query: str, tags: List[str]) -> str: """Generate cache key for memory retrieval.""" components = [agent_id, query] + sorted(tags) return hashlib.md5(":".join(components).encode()).hexdigest() def _deduplicate_memories(self, memories: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove duplicate memories based on content hash.""" seen = set() unique = [] for memory in memories: # Create content hash content_str = json.dumps(memory.get("content", {}), sort_keys=True) content_hash = hashlib.md5(content_str.encode()).hexdigest() if content_hash not in seen: seen.add(content_hash) unique.append(memory) return unique async def share_knowledge_between_agents( self, source_agent: str, target_agent: str, knowledge_type: str, filters: Optional[Dict[str, Any]] = None ) -> bool: """ Share specific knowledge from one agent to another. This enables cross-agent learning and collaboration. """ try: source_config = self.agent_configs.get(source_agent) target_config = self.agent_configs.get(target_agent) if not source_config or not target_config: logger.error(f"Invalid agent IDs: {source_agent} or {target_agent}") return False # Check permissions if source_config["integration_type"] not in [ MemoryIntegrationType.READ_WRITE, MemoryIntegrationType.READ_ONLY ]: logger.error(f"{source_agent} cannot share knowledge (write-only)") return False # Retrieve knowledge from source agent source_memories = await self.memory_agent.retrieve_by_tag( tag=source_agent, limit=100 ) # Filter by knowledge type filtered_memories = [ m for m in source_memories if knowledge_type in m.get("tags", []) ] # Apply additional filters if filters: for key, value in filters.items(): filtered_memories = [ m for m in filtered_memories if m.get(key) == value ] # Tag memories for target agent for memory in filtered_memories: memory["tags"] = list(set(memory.get("tags", []) + [target_agent, "shared"])) logger.info( f"Shared {len(filtered_memories)} {knowledge_type} memories " f"from {source_agent} to {target_agent}" ) return True except Exception as e: logger.error(f"Error sharing knowledge: {str(e)}") return False async def get_memory_statistics(self) -> Dict[str, Any]: """Get statistics about memory usage by agents.""" stats = { "total_accesses": len(self.access_log), "cache_size": len(self.memory_cache), "by_agent": {} } # Calculate per-agent statistics for log_entry in self.access_log: agent_id = log_entry["agent_id"] if agent_id not in stats["by_agent"]: stats["by_agent"][agent_id] = { "accesses": 0, "memories_retrieved": 0, "last_access": None } stats["by_agent"][agent_id]["accesses"] += 1 stats["by_agent"][agent_id]["memories_retrieved"] += log_entry["memories_retrieved"] stats["by_agent"][agent_id]["last_access"] = log_entry["timestamp"] return stats async def optimize_memory_for_agent(self, agent_id: str) -> None: """ Optimize memory storage for a specific agent. This consolidates related memories and removes outdated ones. """ try: config = self.agent_configs.get(agent_id) if not config: return # Retrieve all memories for this agent agent_memories = await self.memory_agent.retrieve_by_tag( tag=agent_id, limit=1000 ) # Group by concept/pattern memory_groups = {} for memory in agent_memories: key = memory.get("concept", memory.get("id", "unknown")) if key not in memory_groups: memory_groups[key] = [] memory_groups[key].append(memory) # Consolidate groups with multiple entries for key, memories in memory_groups.items(): if len(memories) > 5: # Threshold for consolidation # Create consolidated memory consolidated = await self._consolidate_memories(memories) # Store consolidated version await self.memory_agent.store_semantic( memory=consolidated, context=AgentContext( investigation_id=f"consolidation_{agent_id}", user_id="system", session_id="optimization" ) ) # Mark old memories for cleanup for memory in memories[:-1]: # Keep the most recent memory["tags"].append("consolidated") logger.info(f"Optimized memory for {agent_id}: {len(memory_groups)} concepts") except Exception as e: logger.error(f"Error optimizing memory for {agent_id}: {str(e)}") async def _consolidate_memories(self, memories: List[Dict[str, Any]]) -> SemanticMemory: """Consolidate multiple memories into a single semantic memory.""" # Extract common concept concepts = [m.get("concept", "") for m in memories if m.get("concept")] concept = max(set(concepts), key=concepts.count) if concepts else "consolidated" # Merge evidence all_evidence = [] for memory in memories: evidence = memory.get("evidence", []) if isinstance(evidence, list): all_evidence.extend(evidence) # Calculate average confidence confidences = [m.get("confidence", 0.5) for m in memories if "confidence" in m] avg_confidence = sum(confidences) / len(confidences) if confidences else 0.5 # Merge tags all_tags = [] for memory in memories: tags = memory.get("tags", []) if isinstance(tags, list): all_tags.extend(tags) return SemanticMemory( id=f"consolidated_{concept}_{datetime.utcnow().timestamp()}", content={ "consolidated_from": len(memories), "original_ids": [m.get("id") for m in memories], "concept": concept }, concept=concept, relationships=list(set(all_tags)), evidence=list(set(all_evidence))[:10], # Keep top 10 unique evidence confidence=avg_confidence, importance=MemoryImportance.MEDIUM, tags=list(set(all_tags)) + ["consolidated"] ) # Global instance for easy access memory_integration = None async def initialize_memory_integration(memory_agent: ContextMemoryAgent) -> AgentMemoryIntegration: """Initialize the global memory integration service.""" global memory_integration memory_integration = AgentMemoryIntegration(memory_agent) logger.info("Memory integration service initialized") return memory_integration def get_memory_integration() -> Optional[AgentMemoryIntegration]: """Get the global memory integration instance.""" return memory_integration