cidadao.ai-backend / src /services /investigation_service.py
anderson-ufrj
fix(services): update investigation service for Supabase compatibility
c843087
"""
Investigation service for managing investigations with database persistence.
This module provides a service layer for investigation operations,
abstracting the database and agent interactions.
"""
from typing import List, Optional, Dict, Any
from datetime import datetime
import uuid
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from src.core import get_logger
from src.db.simple_session import get_db_session
from src.models.investigation import Investigation
from src.agents import MasterAgent, get_agent_pool
from src.agents.deodoro import AgentContext
logger = get_logger(__name__)
class InvestigationService:
"""
Service for managing investigations with SQLite/PostgreSQL persistence.
"""
def __init__(self):
"""Initialize investigation service."""
pass
async def create(
self,
user_id: str,
query: str,
data_source: str = "contracts",
filters: Optional[Dict[str, Any]] = None,
anomaly_types: Optional[List[str]] = None,
session_id: Optional[str] = None,
) -> Investigation:
"""
Create a new investigation in the database.
Args:
user_id: User ID
query: Investigation query
data_source: Data source to investigate
filters: Query filters
anomaly_types: Types of anomalies to detect
session_id: Optional session ID
Returns:
Created investigation
"""
async with get_db_session() as db:
investigation = Investigation(
user_id=user_id,
session_id=session_id,
query=query,
data_source=data_source,
status="pending",
filters=filters or {},
anomaly_types=anomaly_types or [],
progress=0.0,
)
db.add(investigation)
await db.commit()
await db.refresh(investigation)
logger.info(f"Created investigation {investigation.id} for user {user_id}")
return investigation
async def update_status(
self,
investigation_id: str,
status: str,
progress: Optional[float] = None,
current_phase: Optional[str] = None,
**kwargs
) -> Investigation:
"""Update investigation status and progress."""
async with get_db_session() as db:
result = await db.execute(
select(Investigation).where(Investigation.id == investigation_id)
)
investigation = result.scalar_one_or_none()
if not investigation:
raise ValueError(f"Investigation {investigation_id} not found")
investigation.status = status
if progress is not None:
investigation.progress = progress
if current_phase is not None:
investigation.current_phase = current_phase
# Update other fields
for key, value in kwargs.items():
if hasattr(investigation, key):
setattr(investigation, key, value)
await db.commit()
await db.refresh(investigation)
return investigation
async def _execute_investigation(self, investigation: Investigation):
"""Execute investigation using agents."""
try:
start_time = datetime.utcnow()
investigation.status = "processing"
# Get agent pool
pool = await get_agent_pool()
# Create agent context
context = AgentContext(
investigation_id=investigation.id,
user_id=investigation.user_id,
data_sources=investigation.metadata.get("data_sources", [])
)
# Execute with master agent
async with pool.acquire(MasterAgent, context) as master:
result = await master._investigate(
{"query": investigation.query},
context
)
# Update investigation
investigation.status = "completed"
investigation.confidence_score = result.confidence_score
investigation.completed_at = datetime.utcnow()
investigation.processing_time_ms = (
investigation.completed_at - start_time
).total_seconds() * 1000
logger.info(f"Investigation {investigation.id} completed")
except Exception as e:
logger.error(f"Investigation {investigation.id} failed: {e}")
investigation.status = "failed"
investigation.completed_at = datetime.utcnow()
async def get_by_id(self, investigation_id: str) -> Optional[Investigation]:
"""Get investigation by ID from database."""
async with get_db_session() as db:
result = await db.execute(
select(Investigation).where(Investigation.id == investigation_id)
)
return result.scalar_one_or_none()
async def search(
self,
user_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 20,
offset: int = 0,
) -> List[Investigation]:
"""Search investigations with filters."""
async with get_db_session() as db:
query = select(Investigation)
if user_id:
query = query.where(Investigation.user_id == user_id)
if status:
query = query.where(Investigation.status == status)
query = query.order_by(Investigation.created_at.desc())
query = query.limit(limit).offset(offset)
result = await db.execute(query)
return list(result.scalars().all())
async def cancel(self, investigation_id: str, user_id: str) -> Investigation:
"""Cancel an investigation."""
async with get_db_session() as db:
result = await db.execute(
select(Investigation).where(Investigation.id == investigation_id)
)
investigation = result.scalar_one_or_none()
if not investigation:
raise ValueError(f"Investigation {investigation_id} not found")
if investigation.user_id != user_id:
raise ValueError("Unauthorized")
if investigation.status in ["completed", "failed", "cancelled"]:
raise ValueError(f"Cannot cancel investigation in {investigation.status} status")
investigation.status = "cancelled"
investigation.completed_at = datetime.utcnow()
await db.commit()
await db.refresh(investigation)
logger.info(f"Investigation {investigation_id} cancelled by user {user_id}")
return investigation
async def get_user_investigations(
self,
user_id: str,
limit: int = 10
) -> List[Investigation]:
"""Get investigations for a user."""
return await self.search(user_id=user_id, limit=limit)
# Global service instance
investigation_service = InvestigationService()