Research_AI_Assistant / src /orchestrator_engine.py
JatsTheAIGen's picture
Process flow visualizer + key skills [for validation only) V5
80a97c8
raw
history blame
32.6 kB
# orchestrator_engine.py
import uuid
import logging
import time
from datetime import datetime
logger = logging.getLogger(__name__)
class MVPOrchestrator:
def __init__(self, llm_router, context_manager, agents):
self.llm_router = llm_router
self.context_manager = context_manager
self.agents = agents
self.execution_trace = []
logger.info("MVPOrchestrator initialized")
async def process_request(self, session_id: str, user_input: str) -> dict:
"""
Main orchestration flow with academic differentiation and enhanced reasoning chain
"""
logger.info(f"Processing request for session {session_id}")
logger.info(f"User input: {user_input[:100]}")
# Clear previous trace for new request
self.execution_trace = []
start_time = time.time()
# Initialize enhanced reasoning chain
reasoning_chain = {
"chain_of_thought": {},
"alternative_paths": [],
"uncertainty_areas": [],
"evidence_sources": [],
"confidence_calibration": {}
}
try:
# Step 1: Generate unique interaction ID
interaction_id = self._generate_interaction_id(session_id)
logger.info(f"Generated interaction ID: {interaction_id}")
# Step 2: Context management with reasoning
logger.info("Step 2: Managing context...")
context = await self.context_manager.manage_context(session_id, user_input)
logger.info(f"Context retrieved: {len(context.get('interactions', []))} interactions")
# Add context analysis to reasoning chain
reasoning_chain["chain_of_thought"]["step_1"] = {
"hypothesis": f"User is asking about: '{self._extract_main_topic(user_input)}'",
"evidence": [
f"Previous interactions: {len(context.get('interactions', []))}",
f"Session duration: {self._calculate_session_duration(context)}",
f"Topic continuity: {self._analyze_topic_continuity(context, user_input)}",
f"Query keywords: {self._extract_keywords(user_input)}"
],
"confidence": 0.85,
"reasoning": f"Context analysis shows user is focused on {self._extract_main_topic(user_input)} with {len(context.get('interactions', []))} previous interactions"
}
# Step 3: Intent recognition with enhanced CoT
logger.info("Step 3: Recognizing intent...")
self.execution_trace.append({
"step": "intent_recognition",
"agent": "intent_recognition",
"status": "executing"
})
intent_result = await self.agents['intent_recognition'].execute(
user_input=user_input,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"primary_intent": intent_result.get('primary_intent', 'unknown')}
})
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}")
# Step 3.5: Skills Identification
logger.info("Step 3.5: Identifying relevant skills...")
self.execution_trace.append({
"step": "skills_identification",
"agent": "skills_identification",
"status": "executing"
})
skills_result = await self.agents['skills_identification'].execute(
user_input=user_input,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"skills_count": len(skills_result.get('identified_skills', []))}
})
logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills")
# Add skills reasoning to chain
reasoning_chain["chain_of_thought"]["step_2_5"] = {
"hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills",
"evidence": [
f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}",
f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}",
f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}",
f"Confidence score: {skills_result.get('confidence_score', 0.5)}"
],
"confidence": skills_result.get('confidence_score', 0.5),
"reasoning": f"Skills identification completed for topic '{self._extract_main_topic(user_input)}' with {len(skills_result.get('identified_skills', []))} relevant skills"
}
# Add intent reasoning to chain
reasoning_chain["chain_of_thought"]["step_2"] = {
"hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{self._extract_main_topic(user_input)}'",
"evidence": [
f"Pattern analysis: {self._extract_pattern_evidence(user_input)}",
f"Confidence scores: {intent_result.get('confidence_scores', {})}",
f"Secondary intents: {intent_result.get('secondary_intents', [])}",
f"Query complexity: {self._assess_query_complexity(user_input)}"
],
"confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7),
"reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {self._extract_main_topic(user_input)} based on linguistic patterns and context"
}
# Step 4: Agent execution planning with reasoning
logger.info("Step 4: Creating execution plan...")
execution_plan = await self._create_execution_plan(intent_result, context)
# Add execution planning reasoning
reasoning_chain["chain_of_thought"]["step_3"] = {
"hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{self._extract_main_topic(user_input)}'",
"evidence": [
f"Intent complexity: {self._assess_intent_complexity(intent_result)}",
f"Required agents: {execution_plan.get('agents_to_execute', [])}",
f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}",
f"Response scope: {self._determine_response_scope(user_input)}"
],
"confidence": 0.80,
"reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {self._extract_main_topic(user_input)}"
}
# Step 5: Parallel agent execution
logger.info("Step 5: Executing agents...")
agent_results = await self._execute_agents(execution_plan, user_input, context)
logger.info(f"Agent execution complete: {len(agent_results)} results")
# Step 6: Response synthesis with reasoning
logger.info("Step 6: Synthesizing response...")
self.execution_trace.append({
"step": "response_synthesis",
"agent": "response_synthesis",
"status": "executing"
})
final_response = await self.agents['response_synthesis'].execute(
agent_outputs=agent_results,
user_input=user_input,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')}
})
# Add synthesis reasoning
reasoning_chain["chain_of_thought"]["step_4"] = {
"hypothesis": f"Response synthesis for '{self._extract_main_topic(user_input)}' using '{final_response.get('synthesis_method', 'unknown')}' method",
"evidence": [
f"Synthesis quality: {final_response.get('coherence_score', 0.7)}",
f"Source integration: {len(final_response.get('source_references', []))} sources",
f"Response length: {len(str(final_response.get('final_response', '')))} characters",
f"Content relevance: {self._assess_content_relevance(user_input, final_response)}"
],
"confidence": final_response.get('coherence_score', 0.7),
"reasoning": f"Multi-source synthesis for {self._extract_main_topic(user_input)} using {final_response.get('synthesis_method', 'unknown')} approach"
}
# Step 7: Safety and bias check with reasoning
logger.info("Step 7: Safety check...")
self.execution_trace.append({
"step": "safety_check",
"agent": "safety_check",
"status": "executing"
})
safety_checked = await self.agents['safety_check'].execute(
response=final_response,
context=context
)
self.execution_trace[-1].update({
"status": "completed",
"result": {"warnings": safety_checked.get('warnings', [])}
})
# Add safety reasoning
reasoning_chain["chain_of_thought"]["step_5"] = {
"hypothesis": f"Safety validation for response about '{self._extract_main_topic(user_input)}'",
"evidence": [
f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}",
f"Warnings generated: {len(safety_checked.get('warnings', []))}",
f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}",
f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}"
],
"confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8),
"reasoning": f"Safety analysis for {self._extract_main_topic(user_input)} content with non-blocking warning system"
}
# Generate alternative paths and uncertainty analysis
reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input)
reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked)
reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context)
reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain)
processing_time = time.time() - start_time
result = self._format_final_output(safety_checked, interaction_id, {
'intent': intent_result.get('primary_intent', 'unknown'),
'execution_plan': execution_plan,
'processing_steps': [
'Context management',
'Intent recognition',
'Skills identification',
'Execution planning',
'Agent execution',
'Response synthesis',
'Safety check'
],
'processing_time': processing_time,
'agents_used': list(self.agents.keys()),
'intent_result': intent_result,
'skills_result': skills_result,
'synthesis_result': final_response,
'reasoning_chain': reasoning_chain
})
# Update context with the final response for future context retrieval
response_text = str(result.get('response', ''))
if response_text:
self.context_manager._update_context(context, user_input, response_text)
logger.info(f"Request processing complete. Response length: {len(response_text)}")
return result
except Exception as e:
logger.error(f"Error in process_request: {e}", exc_info=True)
processing_time = time.time() - start_time
return {
"response": f"Error processing request: {str(e)}",
"error": str(e),
"interaction_id": str(uuid.uuid4())[:8],
"agent_trace": [],
"timestamp": datetime.now().isoformat(),
"metadata": {
"agents_used": [],
"processing_time": processing_time,
"token_count": 0,
"warnings": []
}
}
def _generate_interaction_id(self, session_id: str) -> str:
"""
Generate unique interaction identifier
"""
timestamp = datetime.now().isoformat()
unique_id = str(uuid.uuid4())[:8]
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}"
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict:
"""
Create execution plan based on intent recognition
"""
# TODO: Implement agent selection and sequencing logic
return {
"agents_to_execute": [],
"execution_order": "parallel",
"priority": "normal"
}
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict:
"""
Execute agents in parallel or sequential order based on plan
"""
# TODO: Implement parallel/sequential agent execution
return {}
def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict:
"""
Format final output with tracing and metadata
"""
# Extract the actual response text from various possible locations
response_text = (
response.get("final_response") or
response.get("safety_checked_response") or
response.get("original_response") or
response.get("response") or
str(response.get("result", ""))
)
if not response_text:
response_text = "I apologize, but I'm having trouble generating a response right now. Please try again."
# Extract warnings from safety check result
warnings = []
if "warnings" in response:
warnings = response["warnings"] if isinstance(response["warnings"], list) else []
# Build metadata dict
metadata = {
"agents_used": response.get("agents_used", []),
"processing_time": response.get("processing_time", 0),
"token_count": response.get("token_count", 0),
"warnings": warnings
}
# Merge in any additional metadata
if additional_metadata:
metadata.update(additional_metadata)
return {
"interaction_id": interaction_id,
"response": response_text,
"final_response": response_text, # Also provide as final_response for compatibility
"confidence_score": response.get("confidence_score", 0.7),
"agent_trace": self.execution_trace if self.execution_trace else [
{"step": "complete", "agent": "orchestrator", "status": "completed"}
],
"timestamp": datetime.now().isoformat(),
"metadata": metadata
}
def get_execution_trace(self) -> list:
"""
Return execution trace for debugging and analysis
"""
return self.execution_trace
def clear_execution_trace(self):
"""
Clear the execution trace
"""
self.execution_trace = []
def _calculate_session_duration(self, context: dict) -> str:
"""Calculate session duration for reasoning context"""
interactions = context.get('interactions', [])
if not interactions:
return "New session"
# Get first and last interaction timestamps
first_interaction = interactions[-1] if interactions else {}
last_interaction = interactions[0] if interactions else {}
# Simple duration calculation (in practice, would use actual timestamps)
interaction_count = len(interactions)
if interaction_count < 5:
return "Short session (< 5 interactions)"
elif interaction_count < 20:
return "Medium session (5-20 interactions)"
else:
return "Long session (> 20 interactions)"
def _analyze_topic_continuity(self, context: dict, user_input: str) -> str:
"""Analyze topic continuity for reasoning context"""
interactions = context.get('interactions', [])
if not interactions:
return "No previous context"
# Simple topic analysis based on keywords
recent_topics = []
for interaction in interactions[:3]: # Last 3 interactions
user_msg = interaction.get('user_input', '').lower()
if 'machine learning' in user_msg or 'ml' in user_msg:
recent_topics.append('machine learning')
elif 'ai' in user_msg or 'artificial intelligence' in user_msg:
recent_topics.append('artificial intelligence')
elif 'data' in user_msg:
recent_topics.append('data science')
current_input_lower = user_input.lower()
if 'machine learning' in current_input_lower or 'ml' in current_input_lower:
current_topic = 'machine learning'
elif 'ai' in current_input_lower or 'artificial intelligence' in current_input_lower:
current_topic = 'artificial intelligence'
elif 'data' in current_input_lower:
current_topic = 'data science'
else:
current_topic = 'general'
if current_topic in recent_topics:
return f"Continuing {current_topic} discussion"
else:
return f"New topic: {current_topic}"
def _extract_pattern_evidence(self, user_input: str) -> str:
"""Extract pattern evidence for intent reasoning"""
input_lower = user_input.lower()
# Question patterns
if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']):
return "Question pattern detected"
# Request patterns
if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']):
return "Request pattern detected"
# Explanation patterns
if any(word in input_lower for word in ['explain', 'describe', 'tell me about']):
return "Explanation pattern detected"
# Analysis patterns
if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']):
return "Analysis pattern detected"
return "General conversational pattern"
def _assess_intent_complexity(self, intent_result: dict) -> str:
"""Assess intent complexity for reasoning"""
primary_intent = intent_result.get('primary_intent', 'unknown')
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5)
secondary_intents = intent_result.get('secondary_intents', [])
if confidence > 0.8 and len(secondary_intents) == 0:
return "Simple, clear intent"
elif confidence > 0.7 and len(secondary_intents) <= 1:
return "Moderate complexity"
else:
return "Complex, multi-faceted intent"
def _generate_alternative_paths(self, intent_result: dict, user_input: str) -> list:
"""Generate alternative reasoning paths based on actual content"""
primary_intent = intent_result.get('primary_intent', 'unknown')
secondary_intents = intent_result.get('secondary_intents', [])
main_topic = self._extract_main_topic(user_input)
alternative_paths = []
# Add secondary intents as alternative paths
for secondary_intent in secondary_intents:
alternative_paths.append({
"path": f"Alternative intent: {secondary_intent} for {main_topic}",
"reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}",
"confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3),
"rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic"
})
# Add method-based alternatives based on content
if 'curriculum' in user_input.lower() or 'course' in user_input.lower():
alternative_paths.append({
"path": "Structured educational framework approach",
"reasoning": f"Could provide a more structured educational framework for {main_topic}",
"confidence": 0.6,
"rejected_reason": f"Current approach better matches user's specific request for {main_topic}"
})
if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower():
alternative_paths.append({
"path": "High-level overview approach",
"reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}",
"confidence": 0.4,
"rejected_reason": f"User specifically requested detailed information about {main_topic}"
})
return alternative_paths
def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list:
"""Identify areas of uncertainty in the reasoning based on actual content"""
uncertainty_areas = []
# Intent uncertainty
primary_intent = intent_result.get('primary_intent', 'unknown')
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5)
if confidence < 0.8:
uncertainty_areas.append({
"aspect": f"Intent classification ({primary_intent}) for user's specific request",
"confidence": confidence,
"mitigation": "Provided multiple interpretation options and context-aware analysis"
})
# Response quality uncertainty
coherence_score = final_response.get('coherence_score', 0.7)
if coherence_score < 0.8:
uncertainty_areas.append({
"aspect": "Response coherence and structure for the specific topic",
"confidence": coherence_score,
"mitigation": "Applied quality enhancement techniques and content relevance checks"
})
# Safety uncertainty
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)
if safety_score < 0.9:
uncertainty_areas.append({
"aspect": "Content safety and bias assessment for educational content",
"confidence": safety_score,
"mitigation": "Generated advisory warnings for user awareness and content appropriateness"
})
# Content relevance uncertainty
response_text = str(final_response.get('final_response', ''))
if len(response_text) < 100: # Very short response
uncertainty_areas.append({
"aspect": "Response completeness for user's detailed request",
"confidence": 0.6,
"mitigation": "Enhanced response generation with topic-specific content"
})
return uncertainty_areas
def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list:
"""Extract evidence sources for reasoning based on actual content"""
evidence_sources = []
# Intent evidence
evidence_sources.append({
"type": "linguistic_analysis",
"source": "Pattern matching and NLP analysis",
"relevance": 0.9,
"description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent"
})
# Context evidence
interactions = context.get('interactions', [])
if interactions:
evidence_sources.append({
"type": "conversation_history",
"source": f"Previous {len(interactions)} interactions",
"relevance": 0.7,
"description": f"Conversation context and topic continuity analysis"
})
# Synthesis evidence
synthesis_method = final_response.get('synthesis_method', 'unknown')
evidence_sources.append({
"type": "synthesis_method",
"source": f"{synthesis_method} approach",
"relevance": 0.8,
"description": f"Response generated using {synthesis_method} methodology with quality optimization"
})
# Content-specific evidence
response_text = str(final_response.get('final_response', ''))
if len(response_text) > 1000:
evidence_sources.append({
"type": "content_analysis",
"source": "Comprehensive content generation",
"relevance": 0.85,
"description": "Detailed response generation based on user's specific requirements"
})
return evidence_sources
def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict:
"""Calibrate confidence scores across the reasoning chain"""
chain_of_thought = reasoning_chain.get('chain_of_thought', {})
# Calculate overall confidence
step_confidences = []
for step_data in chain_of_thought.values():
if isinstance(step_data, dict) and 'confidence' in step_data:
step_confidences.append(step_data['confidence'])
overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7
return {
"overall_confidence": overall_confidence,
"step_count": len(chain_of_thought),
"confidence_distribution": {
"high_confidence": len([c for c in step_confidences if c > 0.8]),
"medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]),
"low_confidence": len([c for c in step_confidences if c < 0.6])
},
"calibration_method": "Weighted average of step confidences"
}
def _extract_main_topic(self, user_input: str) -> str:
"""Extract the main topic from user input for context-aware reasoning"""
input_lower = user_input.lower()
# Topic extraction based on keywords
if any(word in input_lower for word in ['curriculum', 'course', 'teach', 'learning', 'education']):
if 'ai' in input_lower or 'chatbot' in input_lower or 'assistant' in input_lower:
return "AI chatbot course curriculum"
elif 'programming' in input_lower or 'python' in input_lower:
return "Programming course curriculum"
else:
return "Educational course design"
elif any(word in input_lower for word in ['machine learning', 'ml', 'neural network', 'deep learning']):
return "Machine learning concepts"
elif any(word in input_lower for word in ['ai', 'artificial intelligence', 'chatbot', 'assistant']):
return "Artificial intelligence and chatbots"
elif any(word in input_lower for word in ['data science', 'data analysis', 'analytics']):
return "Data science and analysis"
elif any(word in input_lower for word in ['programming', 'coding', 'development', 'software']):
return "Software development and programming"
else:
# Extract first few words as topic
words = user_input.split()[:4]
return " ".join(words) if words else "General inquiry"
def _extract_keywords(self, user_input: str) -> str:
"""Extract key terms from user input"""
input_lower = user_input.lower()
keywords = []
# Extract important terms
important_terms = [
'curriculum', 'course', 'teach', 'learning', 'education',
'ai', 'artificial intelligence', 'chatbot', 'assistant',
'machine learning', 'ml', 'neural network', 'deep learning',
'programming', 'python', 'development', 'software',
'data science', 'analytics', 'analysis'
]
for term in important_terms:
if term in input_lower:
keywords.append(term)
return ", ".join(keywords[:5]) if keywords else "General terms"
def _assess_query_complexity(self, user_input: str) -> str:
"""Assess the complexity of the user query"""
word_count = len(user_input.split())
question_count = user_input.count('?')
if word_count > 50 and question_count > 2:
return "Highly complex multi-part query"
elif word_count > 30 and question_count > 1:
return "Moderately complex query"
elif word_count > 15:
return "Standard complexity query"
else:
return "Simple query"
def _determine_response_scope(self, user_input: str) -> str:
"""Determine the scope of response needed"""
input_lower = user_input.lower()
if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']):
return "Comprehensive detailed response"
elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']):
return "Brief summary response"
elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']):
return "Step-by-step instructional response"
else:
return "Standard informative response"
def _assess_content_relevance(self, user_input: str, final_response: dict) -> str:
"""Assess how relevant the response content is to the user input"""
response_text = str(final_response.get('final_response', ''))
# Simple relevance check based on keyword overlap
input_words = set(user_input.lower().split())
response_words = set(response_text.lower().split())
overlap = len(input_words.intersection(response_words))
total_input_words = len(input_words)
if overlap / total_input_words > 0.3:
return "High relevance to user query"
elif overlap / total_input_words > 0.15:
return "Moderate relevance to user query"
else:
return "Low relevance to user query"
def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str:
"""Assess content appropriateness for the topic"""
warnings = safety_checked.get('warnings', [])
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)
if safety_score > 0.9 and len(warnings) == 0:
return "Highly appropriate content"
elif safety_score > 0.8 and len(warnings) <= 1:
return "Appropriate content with minor notes"
else:
return "Content requires review"