|
|
|
|
|
import uuid |
|
|
import logging |
|
|
import time |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class MVPOrchestrator: |
|
|
def __init__(self, llm_router, context_manager, agents): |
|
|
self.llm_router = llm_router |
|
|
self.context_manager = context_manager |
|
|
self.agents = agents |
|
|
self.execution_trace = [] |
|
|
logger.info("MVPOrchestrator initialized") |
|
|
|
|
|
async def process_request(self, session_id: str, user_input: str) -> dict: |
|
|
""" |
|
|
Main orchestration flow with academic differentiation and enhanced reasoning chain |
|
|
""" |
|
|
logger.info(f"Processing request for session {session_id}") |
|
|
logger.info(f"User input: {user_input[:100]}") |
|
|
|
|
|
|
|
|
self.execution_trace = [] |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
reasoning_chain = { |
|
|
"chain_of_thought": {}, |
|
|
"alternative_paths": [], |
|
|
"uncertainty_areas": [], |
|
|
"evidence_sources": [], |
|
|
"confidence_calibration": {} |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
interaction_id = self._generate_interaction_id(session_id) |
|
|
logger.info(f"Generated interaction ID: {interaction_id}") |
|
|
|
|
|
|
|
|
logger.info("Step 2: Managing context...") |
|
|
context = await self.context_manager.manage_context(session_id, user_input) |
|
|
logger.info(f"Context retrieved: {len(context.get('interactions', []))} interactions") |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_1"] = { |
|
|
"hypothesis": f"User is asking about: '{self._extract_main_topic(user_input)}'", |
|
|
"evidence": [ |
|
|
f"Previous interactions: {len(context.get('interactions', []))}", |
|
|
f"Session duration: {self._calculate_session_duration(context)}", |
|
|
f"Topic continuity: {self._analyze_topic_continuity(context, user_input)}", |
|
|
f"Query keywords: {self._extract_keywords(user_input)}" |
|
|
], |
|
|
"confidence": 0.85, |
|
|
"reasoning": f"Context analysis shows user is focused on {self._extract_main_topic(user_input)} with {len(context.get('interactions', []))} previous interactions" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 3: Recognizing intent...") |
|
|
self.execution_trace.append({ |
|
|
"step": "intent_recognition", |
|
|
"agent": "intent_recognition", |
|
|
"status": "executing" |
|
|
}) |
|
|
intent_result = await self.agents['intent_recognition'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"primary_intent": intent_result.get('primary_intent', 'unknown')} |
|
|
}) |
|
|
logger.info(f"Intent detected: {intent_result.get('primary_intent', 'unknown')}") |
|
|
|
|
|
|
|
|
logger.info("Step 3.5: Identifying relevant skills...") |
|
|
self.execution_trace.append({ |
|
|
"step": "skills_identification", |
|
|
"agent": "skills_identification", |
|
|
"status": "executing" |
|
|
}) |
|
|
skills_result = await self.agents['skills_identification'].execute( |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"skills_count": len(skills_result.get('identified_skills', []))} |
|
|
}) |
|
|
logger.info(f"Skills identified: {len(skills_result.get('identified_skills', []))} skills") |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_2_5"] = { |
|
|
"hypothesis": f"User input relates to {len(skills_result.get('identified_skills', []))} expert skills", |
|
|
"evidence": [ |
|
|
f"Market analysis: {skills_result.get('market_analysis', {}).get('overall_analysis', 'N/A')}", |
|
|
f"Skill classification: {skills_result.get('skill_classification', {}).get('classification_reasoning', 'N/A')}", |
|
|
f"High-probability skills: {[s.get('skill', '') for s in skills_result.get('identified_skills', [])[:3]]}", |
|
|
f"Confidence score: {skills_result.get('confidence_score', 0.5)}" |
|
|
], |
|
|
"confidence": skills_result.get('confidence_score', 0.5), |
|
|
"reasoning": f"Skills identification completed for topic '{self._extract_main_topic(user_input)}' with {len(skills_result.get('identified_skills', []))} relevant skills" |
|
|
} |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_2"] = { |
|
|
"hypothesis": f"User intent is '{intent_result.get('primary_intent', 'unknown')}' for topic '{self._extract_main_topic(user_input)}'", |
|
|
"evidence": [ |
|
|
f"Pattern analysis: {self._extract_pattern_evidence(user_input)}", |
|
|
f"Confidence scores: {intent_result.get('confidence_scores', {})}", |
|
|
f"Secondary intents: {intent_result.get('secondary_intents', [])}", |
|
|
f"Query complexity: {self._assess_query_complexity(user_input)}" |
|
|
], |
|
|
"confidence": intent_result.get('confidence_scores', {}).get(intent_result.get('primary_intent', 'unknown'), 0.7), |
|
|
"reasoning": f"Intent '{intent_result.get('primary_intent', 'unknown')}' detected for {self._extract_main_topic(user_input)} based on linguistic patterns and context" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 4: Creating execution plan...") |
|
|
execution_plan = await self._create_execution_plan(intent_result, context) |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_3"] = { |
|
|
"hypothesis": f"Optimal approach for '{intent_result.get('primary_intent', 'unknown')}' intent on '{self._extract_main_topic(user_input)}'", |
|
|
"evidence": [ |
|
|
f"Intent complexity: {self._assess_intent_complexity(intent_result)}", |
|
|
f"Required agents: {execution_plan.get('agents_to_execute', [])}", |
|
|
f"Execution strategy: {execution_plan.get('execution_order', 'sequential')}", |
|
|
f"Response scope: {self._determine_response_scope(user_input)}" |
|
|
], |
|
|
"confidence": 0.80, |
|
|
"reasoning": f"Agent selection optimized for {intent_result.get('primary_intent', 'unknown')} intent regarding {self._extract_main_topic(user_input)}" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 5: Executing agents...") |
|
|
agent_results = await self._execute_agents(execution_plan, user_input, context) |
|
|
logger.info(f"Agent execution complete: {len(agent_results)} results") |
|
|
|
|
|
|
|
|
logger.info("Step 6: Synthesizing response...") |
|
|
self.execution_trace.append({ |
|
|
"step": "response_synthesis", |
|
|
"agent": "response_synthesis", |
|
|
"status": "executing" |
|
|
}) |
|
|
final_response = await self.agents['response_synthesis'].execute( |
|
|
agent_outputs=agent_results, |
|
|
user_input=user_input, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"synthesis_method": final_response.get('synthesis_method', 'unknown')} |
|
|
}) |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_4"] = { |
|
|
"hypothesis": f"Response synthesis for '{self._extract_main_topic(user_input)}' using '{final_response.get('synthesis_method', 'unknown')}' method", |
|
|
"evidence": [ |
|
|
f"Synthesis quality: {final_response.get('coherence_score', 0.7)}", |
|
|
f"Source integration: {len(final_response.get('source_references', []))} sources", |
|
|
f"Response length: {len(str(final_response.get('final_response', '')))} characters", |
|
|
f"Content relevance: {self._assess_content_relevance(user_input, final_response)}" |
|
|
], |
|
|
"confidence": final_response.get('coherence_score', 0.7), |
|
|
"reasoning": f"Multi-source synthesis for {self._extract_main_topic(user_input)} using {final_response.get('synthesis_method', 'unknown')} approach" |
|
|
} |
|
|
|
|
|
|
|
|
logger.info("Step 7: Safety check...") |
|
|
self.execution_trace.append({ |
|
|
"step": "safety_check", |
|
|
"agent": "safety_check", |
|
|
"status": "executing" |
|
|
}) |
|
|
safety_checked = await self.agents['safety_check'].execute( |
|
|
response=final_response, |
|
|
context=context |
|
|
) |
|
|
self.execution_trace[-1].update({ |
|
|
"status": "completed", |
|
|
"result": {"warnings": safety_checked.get('warnings', [])} |
|
|
}) |
|
|
|
|
|
|
|
|
reasoning_chain["chain_of_thought"]["step_5"] = { |
|
|
"hypothesis": f"Safety validation for response about '{self._extract_main_topic(user_input)}'", |
|
|
"evidence": [ |
|
|
f"Safety score: {safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8)}", |
|
|
f"Warnings generated: {len(safety_checked.get('warnings', []))}", |
|
|
f"Analysis method: {safety_checked.get('safety_analysis', {}).get('analysis_method', 'unknown')}", |
|
|
f"Content appropriateness: {self._assess_content_appropriateness(user_input, safety_checked)}" |
|
|
], |
|
|
"confidence": safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8), |
|
|
"reasoning": f"Safety analysis for {self._extract_main_topic(user_input)} content with non-blocking warning system" |
|
|
} |
|
|
|
|
|
|
|
|
reasoning_chain["alternative_paths"] = self._generate_alternative_paths(intent_result, user_input) |
|
|
reasoning_chain["uncertainty_areas"] = self._identify_uncertainty_areas(intent_result, final_response, safety_checked) |
|
|
reasoning_chain["evidence_sources"] = self._extract_evidence_sources(intent_result, final_response, context) |
|
|
reasoning_chain["confidence_calibration"] = self._calibrate_confidence_scores(reasoning_chain) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
result = self._format_final_output(safety_checked, interaction_id, { |
|
|
'intent': intent_result.get('primary_intent', 'unknown'), |
|
|
'execution_plan': execution_plan, |
|
|
'processing_steps': [ |
|
|
'Context management', |
|
|
'Intent recognition', |
|
|
'Skills identification', |
|
|
'Execution planning', |
|
|
'Agent execution', |
|
|
'Response synthesis', |
|
|
'Safety check' |
|
|
], |
|
|
'processing_time': processing_time, |
|
|
'agents_used': list(self.agents.keys()), |
|
|
'intent_result': intent_result, |
|
|
'skills_result': skills_result, |
|
|
'synthesis_result': final_response, |
|
|
'reasoning_chain': reasoning_chain |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(result.get('response', '')) |
|
|
if response_text: |
|
|
self.context_manager._update_context(context, user_input, response_text) |
|
|
|
|
|
logger.info(f"Request processing complete. Response length: {len(response_text)}") |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in process_request: {e}", exc_info=True) |
|
|
processing_time = time.time() - start_time |
|
|
return { |
|
|
"response": f"Error processing request: {str(e)}", |
|
|
"error": str(e), |
|
|
"interaction_id": str(uuid.uuid4())[:8], |
|
|
"agent_trace": [], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"metadata": { |
|
|
"agents_used": [], |
|
|
"processing_time": processing_time, |
|
|
"token_count": 0, |
|
|
"warnings": [] |
|
|
} |
|
|
} |
|
|
|
|
|
def _generate_interaction_id(self, session_id: str) -> str: |
|
|
""" |
|
|
Generate unique interaction identifier |
|
|
""" |
|
|
timestamp = datetime.now().isoformat() |
|
|
unique_id = str(uuid.uuid4())[:8] |
|
|
return f"{session_id}_{unique_id}_{int(datetime.now().timestamp())}" |
|
|
|
|
|
async def _create_execution_plan(self, intent_result: dict, context: dict) -> dict: |
|
|
""" |
|
|
Create execution plan based on intent recognition |
|
|
""" |
|
|
|
|
|
return { |
|
|
"agents_to_execute": [], |
|
|
"execution_order": "parallel", |
|
|
"priority": "normal" |
|
|
} |
|
|
|
|
|
async def _execute_agents(self, execution_plan: dict, user_input: str, context: dict) -> dict: |
|
|
""" |
|
|
Execute agents in parallel or sequential order based on plan |
|
|
""" |
|
|
|
|
|
return {} |
|
|
|
|
|
def _format_final_output(self, response: dict, interaction_id: str, additional_metadata: dict = None) -> dict: |
|
|
""" |
|
|
Format final output with tracing and metadata |
|
|
""" |
|
|
|
|
|
response_text = ( |
|
|
response.get("final_response") or |
|
|
response.get("safety_checked_response") or |
|
|
response.get("original_response") or |
|
|
response.get("response") or |
|
|
str(response.get("result", "")) |
|
|
) |
|
|
|
|
|
if not response_text: |
|
|
response_text = "I apologize, but I'm having trouble generating a response right now. Please try again." |
|
|
|
|
|
|
|
|
warnings = [] |
|
|
if "warnings" in response: |
|
|
warnings = response["warnings"] if isinstance(response["warnings"], list) else [] |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"agents_used": response.get("agents_used", []), |
|
|
"processing_time": response.get("processing_time", 0), |
|
|
"token_count": response.get("token_count", 0), |
|
|
"warnings": warnings |
|
|
} |
|
|
|
|
|
|
|
|
if additional_metadata: |
|
|
metadata.update(additional_metadata) |
|
|
|
|
|
return { |
|
|
"interaction_id": interaction_id, |
|
|
"response": response_text, |
|
|
"final_response": response_text, |
|
|
"confidence_score": response.get("confidence_score", 0.7), |
|
|
"agent_trace": self.execution_trace if self.execution_trace else [ |
|
|
{"step": "complete", "agent": "orchestrator", "status": "completed"} |
|
|
], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"metadata": metadata |
|
|
} |
|
|
|
|
|
def get_execution_trace(self) -> list: |
|
|
""" |
|
|
Return execution trace for debugging and analysis |
|
|
""" |
|
|
return self.execution_trace |
|
|
|
|
|
def clear_execution_trace(self): |
|
|
""" |
|
|
Clear the execution trace |
|
|
""" |
|
|
self.execution_trace = [] |
|
|
|
|
|
def _calculate_session_duration(self, context: dict) -> str: |
|
|
"""Calculate session duration for reasoning context""" |
|
|
interactions = context.get('interactions', []) |
|
|
if not interactions: |
|
|
return "New session" |
|
|
|
|
|
|
|
|
first_interaction = interactions[-1] if interactions else {} |
|
|
last_interaction = interactions[0] if interactions else {} |
|
|
|
|
|
|
|
|
interaction_count = len(interactions) |
|
|
if interaction_count < 5: |
|
|
return "Short session (< 5 interactions)" |
|
|
elif interaction_count < 20: |
|
|
return "Medium session (5-20 interactions)" |
|
|
else: |
|
|
return "Long session (> 20 interactions)" |
|
|
|
|
|
def _analyze_topic_continuity(self, context: dict, user_input: str) -> str: |
|
|
"""Analyze topic continuity for reasoning context""" |
|
|
interactions = context.get('interactions', []) |
|
|
if not interactions: |
|
|
return "No previous context" |
|
|
|
|
|
|
|
|
recent_topics = [] |
|
|
for interaction in interactions[:3]: |
|
|
user_msg = interaction.get('user_input', '').lower() |
|
|
if 'machine learning' in user_msg or 'ml' in user_msg: |
|
|
recent_topics.append('machine learning') |
|
|
elif 'ai' in user_msg or 'artificial intelligence' in user_msg: |
|
|
recent_topics.append('artificial intelligence') |
|
|
elif 'data' in user_msg: |
|
|
recent_topics.append('data science') |
|
|
|
|
|
current_input_lower = user_input.lower() |
|
|
if 'machine learning' in current_input_lower or 'ml' in current_input_lower: |
|
|
current_topic = 'machine learning' |
|
|
elif 'ai' in current_input_lower or 'artificial intelligence' in current_input_lower: |
|
|
current_topic = 'artificial intelligence' |
|
|
elif 'data' in current_input_lower: |
|
|
current_topic = 'data science' |
|
|
else: |
|
|
current_topic = 'general' |
|
|
|
|
|
if current_topic in recent_topics: |
|
|
return f"Continuing {current_topic} discussion" |
|
|
else: |
|
|
return f"New topic: {current_topic}" |
|
|
|
|
|
def _extract_pattern_evidence(self, user_input: str) -> str: |
|
|
"""Extract pattern evidence for intent reasoning""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['what', 'how', 'why', 'when', 'where', 'which']): |
|
|
return "Question pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['please', 'can you', 'could you', 'help me']): |
|
|
return "Request pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['explain', 'describe', 'tell me about']): |
|
|
return "Explanation pattern detected" |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['analyze', 'compare', 'evaluate', 'assess']): |
|
|
return "Analysis pattern detected" |
|
|
|
|
|
return "General conversational pattern" |
|
|
|
|
|
def _assess_intent_complexity(self, intent_result: dict) -> str: |
|
|
"""Assess intent complexity for reasoning""" |
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
|
|
secondary_intents = intent_result.get('secondary_intents', []) |
|
|
|
|
|
if confidence > 0.8 and len(secondary_intents) == 0: |
|
|
return "Simple, clear intent" |
|
|
elif confidence > 0.7 and len(secondary_intents) <= 1: |
|
|
return "Moderate complexity" |
|
|
else: |
|
|
return "Complex, multi-faceted intent" |
|
|
|
|
|
def _generate_alternative_paths(self, intent_result: dict, user_input: str) -> list: |
|
|
"""Generate alternative reasoning paths based on actual content""" |
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
secondary_intents = intent_result.get('secondary_intents', []) |
|
|
main_topic = self._extract_main_topic(user_input) |
|
|
|
|
|
alternative_paths = [] |
|
|
|
|
|
|
|
|
for secondary_intent in secondary_intents: |
|
|
alternative_paths.append({ |
|
|
"path": f"Alternative intent: {secondary_intent} for {main_topic}", |
|
|
"reasoning": f"Could interpret as {secondary_intent} based on linguistic patterns in the query about {main_topic}", |
|
|
"confidence": intent_result.get('confidence_scores', {}).get(secondary_intent, 0.3), |
|
|
"rejected_reason": f"Primary intent '{primary_intent}' has higher confidence for {main_topic} topic" |
|
|
}) |
|
|
|
|
|
|
|
|
if 'curriculum' in user_input.lower() or 'course' in user_input.lower(): |
|
|
alternative_paths.append({ |
|
|
"path": "Structured educational framework approach", |
|
|
"reasoning": f"Could provide a more structured educational framework for {main_topic}", |
|
|
"confidence": 0.6, |
|
|
"rejected_reason": f"Current approach better matches user's specific request for {main_topic}" |
|
|
}) |
|
|
|
|
|
if 'detailed' in user_input.lower() or 'comprehensive' in user_input.lower(): |
|
|
alternative_paths.append({ |
|
|
"path": "High-level overview approach", |
|
|
"reasoning": f"Could provide a high-level overview instead of detailed content for {main_topic}", |
|
|
"confidence": 0.4, |
|
|
"rejected_reason": f"User specifically requested detailed information about {main_topic}" |
|
|
}) |
|
|
|
|
|
return alternative_paths |
|
|
|
|
|
def _identify_uncertainty_areas(self, intent_result: dict, final_response: dict, safety_checked: dict) -> list: |
|
|
"""Identify areas of uncertainty in the reasoning based on actual content""" |
|
|
uncertainty_areas = [] |
|
|
|
|
|
|
|
|
primary_intent = intent_result.get('primary_intent', 'unknown') |
|
|
confidence = intent_result.get('confidence_scores', {}).get(primary_intent, 0.5) |
|
|
if confidence < 0.8: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": f"Intent classification ({primary_intent}) for user's specific request", |
|
|
"confidence": confidence, |
|
|
"mitigation": "Provided multiple interpretation options and context-aware analysis" |
|
|
}) |
|
|
|
|
|
|
|
|
coherence_score = final_response.get('coherence_score', 0.7) |
|
|
if coherence_score < 0.8: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Response coherence and structure for the specific topic", |
|
|
"confidence": coherence_score, |
|
|
"mitigation": "Applied quality enhancement techniques and content relevance checks" |
|
|
}) |
|
|
|
|
|
|
|
|
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
|
|
if safety_score < 0.9: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Content safety and bias assessment for educational content", |
|
|
"confidence": safety_score, |
|
|
"mitigation": "Generated advisory warnings for user awareness and content appropriateness" |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
if len(response_text) < 100: |
|
|
uncertainty_areas.append({ |
|
|
"aspect": "Response completeness for user's detailed request", |
|
|
"confidence": 0.6, |
|
|
"mitigation": "Enhanced response generation with topic-specific content" |
|
|
}) |
|
|
|
|
|
return uncertainty_areas |
|
|
|
|
|
def _extract_evidence_sources(self, intent_result: dict, final_response: dict, context: dict) -> list: |
|
|
"""Extract evidence sources for reasoning based on actual content""" |
|
|
evidence_sources = [] |
|
|
|
|
|
|
|
|
evidence_sources.append({ |
|
|
"type": "linguistic_analysis", |
|
|
"source": "Pattern matching and NLP analysis", |
|
|
"relevance": 0.9, |
|
|
"description": f"Intent classification based on linguistic patterns for '{intent_result.get('primary_intent', 'unknown')}' intent" |
|
|
}) |
|
|
|
|
|
|
|
|
interactions = context.get('interactions', []) |
|
|
if interactions: |
|
|
evidence_sources.append({ |
|
|
"type": "conversation_history", |
|
|
"source": f"Previous {len(interactions)} interactions", |
|
|
"relevance": 0.7, |
|
|
"description": f"Conversation context and topic continuity analysis" |
|
|
}) |
|
|
|
|
|
|
|
|
synthesis_method = final_response.get('synthesis_method', 'unknown') |
|
|
evidence_sources.append({ |
|
|
"type": "synthesis_method", |
|
|
"source": f"{synthesis_method} approach", |
|
|
"relevance": 0.8, |
|
|
"description": f"Response generated using {synthesis_method} methodology with quality optimization" |
|
|
}) |
|
|
|
|
|
|
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
if len(response_text) > 1000: |
|
|
evidence_sources.append({ |
|
|
"type": "content_analysis", |
|
|
"source": "Comprehensive content generation", |
|
|
"relevance": 0.85, |
|
|
"description": "Detailed response generation based on user's specific requirements" |
|
|
}) |
|
|
|
|
|
return evidence_sources |
|
|
|
|
|
def _calibrate_confidence_scores(self, reasoning_chain: dict) -> dict: |
|
|
"""Calibrate confidence scores across the reasoning chain""" |
|
|
chain_of_thought = reasoning_chain.get('chain_of_thought', {}) |
|
|
|
|
|
|
|
|
step_confidences = [] |
|
|
for step_data in chain_of_thought.values(): |
|
|
if isinstance(step_data, dict) and 'confidence' in step_data: |
|
|
step_confidences.append(step_data['confidence']) |
|
|
|
|
|
overall_confidence = sum(step_confidences) / len(step_confidences) if step_confidences else 0.7 |
|
|
|
|
|
return { |
|
|
"overall_confidence": overall_confidence, |
|
|
"step_count": len(chain_of_thought), |
|
|
"confidence_distribution": { |
|
|
"high_confidence": len([c for c in step_confidences if c > 0.8]), |
|
|
"medium_confidence": len([c for c in step_confidences if 0.6 <= c <= 0.8]), |
|
|
"low_confidence": len([c for c in step_confidences if c < 0.6]) |
|
|
}, |
|
|
"calibration_method": "Weighted average of step confidences" |
|
|
} |
|
|
|
|
|
def _extract_main_topic(self, user_input: str) -> str: |
|
|
"""Extract the main topic from user input for context-aware reasoning""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
|
|
|
if any(word in input_lower for word in ['curriculum', 'course', 'teach', 'learning', 'education']): |
|
|
if 'ai' in input_lower or 'chatbot' in input_lower or 'assistant' in input_lower: |
|
|
return "AI chatbot course curriculum" |
|
|
elif 'programming' in input_lower or 'python' in input_lower: |
|
|
return "Programming course curriculum" |
|
|
else: |
|
|
return "Educational course design" |
|
|
|
|
|
elif any(word in input_lower for word in ['machine learning', 'ml', 'neural network', 'deep learning']): |
|
|
return "Machine learning concepts" |
|
|
|
|
|
elif any(word in input_lower for word in ['ai', 'artificial intelligence', 'chatbot', 'assistant']): |
|
|
return "Artificial intelligence and chatbots" |
|
|
|
|
|
elif any(word in input_lower for word in ['data science', 'data analysis', 'analytics']): |
|
|
return "Data science and analysis" |
|
|
|
|
|
elif any(word in input_lower for word in ['programming', 'coding', 'development', 'software']): |
|
|
return "Software development and programming" |
|
|
|
|
|
else: |
|
|
|
|
|
words = user_input.split()[:4] |
|
|
return " ".join(words) if words else "General inquiry" |
|
|
|
|
|
def _extract_keywords(self, user_input: str) -> str: |
|
|
"""Extract key terms from user input""" |
|
|
input_lower = user_input.lower() |
|
|
keywords = [] |
|
|
|
|
|
|
|
|
important_terms = [ |
|
|
'curriculum', 'course', 'teach', 'learning', 'education', |
|
|
'ai', 'artificial intelligence', 'chatbot', 'assistant', |
|
|
'machine learning', 'ml', 'neural network', 'deep learning', |
|
|
'programming', 'python', 'development', 'software', |
|
|
'data science', 'analytics', 'analysis' |
|
|
] |
|
|
|
|
|
for term in important_terms: |
|
|
if term in input_lower: |
|
|
keywords.append(term) |
|
|
|
|
|
return ", ".join(keywords[:5]) if keywords else "General terms" |
|
|
|
|
|
def _assess_query_complexity(self, user_input: str) -> str: |
|
|
"""Assess the complexity of the user query""" |
|
|
word_count = len(user_input.split()) |
|
|
question_count = user_input.count('?') |
|
|
|
|
|
if word_count > 50 and question_count > 2: |
|
|
return "Highly complex multi-part query" |
|
|
elif word_count > 30 and question_count > 1: |
|
|
return "Moderately complex query" |
|
|
elif word_count > 15: |
|
|
return "Standard complexity query" |
|
|
else: |
|
|
return "Simple query" |
|
|
|
|
|
def _determine_response_scope(self, user_input: str) -> str: |
|
|
"""Determine the scope of response needed""" |
|
|
input_lower = user_input.lower() |
|
|
|
|
|
if any(word in input_lower for word in ['detailed', 'comprehensive', 'complete', 'full']): |
|
|
return "Comprehensive detailed response" |
|
|
elif any(word in input_lower for word in ['brief', 'short', 'summary', 'overview']): |
|
|
return "Brief summary response" |
|
|
elif any(word in input_lower for word in ['step by step', 'tutorial', 'guide', 'how to']): |
|
|
return "Step-by-step instructional response" |
|
|
else: |
|
|
return "Standard informative response" |
|
|
|
|
|
def _assess_content_relevance(self, user_input: str, final_response: dict) -> str: |
|
|
"""Assess how relevant the response content is to the user input""" |
|
|
response_text = str(final_response.get('final_response', '')) |
|
|
|
|
|
|
|
|
input_words = set(user_input.lower().split()) |
|
|
response_words = set(response_text.lower().split()) |
|
|
|
|
|
overlap = len(input_words.intersection(response_words)) |
|
|
total_input_words = len(input_words) |
|
|
|
|
|
if overlap / total_input_words > 0.3: |
|
|
return "High relevance to user query" |
|
|
elif overlap / total_input_words > 0.15: |
|
|
return "Moderate relevance to user query" |
|
|
else: |
|
|
return "Low relevance to user query" |
|
|
|
|
|
def _assess_content_appropriateness(self, user_input: str, safety_checked: dict) -> str: |
|
|
"""Assess content appropriateness for the topic""" |
|
|
warnings = safety_checked.get('warnings', []) |
|
|
safety_score = safety_checked.get('safety_analysis', {}).get('overall_safety_score', 0.8) |
|
|
|
|
|
if safety_score > 0.9 and len(warnings) == 0: |
|
|
return "Highly appropriate content" |
|
|
elif safety_score > 0.8 and len(warnings) <= 1: |
|
|
return "Appropriate content with minor notes" |
|
|
else: |
|
|
return "Content requires review" |
|
|
|