|
|
""" |
|
|
Module: services.auto_investigation_service |
|
|
Description: Auto Investigation Service - 24/7 Contract Monitoring and Analysis |
|
|
Author: Anderson Henrique da Silva |
|
|
Date: 2025-10-07 18:11:37 |
|
|
License: Proprietary - All rights reserved |
|
|
|
|
|
This service continuously monitors government contracts (new and historical) |
|
|
and automatically triggers investigations when suspicious patterns are detected. |
|
|
""" |
|
|
|
|
|
from typing import List, Dict, Any, Optional |
|
|
from datetime import datetime, timedelta |
|
|
import asyncio |
|
|
|
|
|
from src.core import get_logger |
|
|
from src.tools.transparency_api import TransparencyAPIClient, TransparencyAPIFilter |
|
|
from src.agents import InvestigatorAgent, AgentContext |
|
|
from src.services.investigation_service_selector import investigation_service |
|
|
from src.models.forensic_investigation import AnomalySeverity |
|
|
|
|
|
logger = get_logger(__name__) |
|
|
|
|
|
|
|
|
class AutoInvestigationService: |
|
|
""" |
|
|
Service for 24/7 automatic contract investigation. |
|
|
|
|
|
Features: |
|
|
- Monitors new contracts from Portal da Transparência |
|
|
- Re-analyzes historical contracts with updated ML models |
|
|
- Triggers investigations automatically on suspicious patterns |
|
|
- Learns from discovered patterns (unsupervised) |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize auto-investigation service.""" |
|
|
self.transparency_api = TransparencyAPIClient() |
|
|
self.investigator = None |
|
|
|
|
|
|
|
|
self.value_threshold = 100000.0 |
|
|
self.daily_contract_limit = 500 |
|
|
|
|
|
async def _get_investigator(self) -> InvestigatorAgent: |
|
|
"""Lazy load investigator agent.""" |
|
|
if self.investigator is None: |
|
|
self.investigator = InvestigatorAgent() |
|
|
return self.investigator |
|
|
|
|
|
async def monitor_new_contracts( |
|
|
self, |
|
|
lookback_hours: int = 24, |
|
|
organization_codes: Optional[List[str]] = None |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Monitor and investigate new contracts from the last N hours. |
|
|
|
|
|
Args: |
|
|
lookback_hours: How many hours back to look for new contracts |
|
|
organization_codes: Specific organizations to monitor |
|
|
|
|
|
Returns: |
|
|
Summary of monitoring results |
|
|
""" |
|
|
logger.info( |
|
|
"auto_monitoring_started", |
|
|
lookback_hours=lookback_hours, |
|
|
org_count=len(organization_codes) if organization_codes else "all" |
|
|
) |
|
|
|
|
|
start_time = datetime.utcnow() |
|
|
|
|
|
try: |
|
|
|
|
|
end_date = datetime.utcnow() |
|
|
start_date = end_date - timedelta(hours=lookback_hours) |
|
|
|
|
|
|
|
|
contracts = await self._fetch_recent_contracts( |
|
|
start_date=start_date, |
|
|
end_date=end_date, |
|
|
organization_codes=organization_codes |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
"contracts_fetched", |
|
|
count=len(contracts), |
|
|
date_range=f"{start_date.date()} to {end_date.date()}" |
|
|
) |
|
|
|
|
|
|
|
|
suspicious_contracts = await self._pre_screen_contracts(contracts) |
|
|
|
|
|
logger.info( |
|
|
"contracts_pre_screened", |
|
|
total=len(contracts), |
|
|
suspicious=len(suspicious_contracts) |
|
|
) |
|
|
|
|
|
|
|
|
investigations = await self._investigate_batch(suspicious_contracts) |
|
|
|
|
|
duration = (datetime.utcnow() - start_time).total_seconds() |
|
|
|
|
|
result = { |
|
|
"monitoring_type": "new_contracts", |
|
|
"lookback_hours": lookback_hours, |
|
|
"contracts_analyzed": len(contracts), |
|
|
"suspicious_found": len(suspicious_contracts), |
|
|
"investigations_created": len(investigations), |
|
|
"anomalies_detected": sum(len(inv.get("anomalies", [])) for inv in investigations), |
|
|
"duration_seconds": duration, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
logger.info("auto_monitoring_completed", **result) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"auto_monitoring_failed", |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
raise |
|
|
|
|
|
async def reanalyze_historical_contracts( |
|
|
self, |
|
|
months_back: int = 6, |
|
|
batch_size: int = 100 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Re-analyze historical contracts with updated detection models. |
|
|
|
|
|
This is useful after ML model improvements to find previously |
|
|
missed anomalies in historical data. |
|
|
|
|
|
Args: |
|
|
months_back: How many months of historical data to analyze |
|
|
batch_size: Number of contracts per batch |
|
|
|
|
|
Returns: |
|
|
Summary of reanalysis results |
|
|
""" |
|
|
logger.info( |
|
|
"historical_reanalysis_started", |
|
|
months_back=months_back, |
|
|
batch_size=batch_size |
|
|
) |
|
|
|
|
|
start_time = datetime.utcnow() |
|
|
|
|
|
try: |
|
|
|
|
|
end_date = datetime.utcnow() |
|
|
start_date = end_date - timedelta(days=months_back * 30) |
|
|
|
|
|
total_analyzed = 0 |
|
|
total_investigations = 0 |
|
|
total_anomalies = 0 |
|
|
|
|
|
|
|
|
current_date = start_date |
|
|
batch_end_date = start_date + timedelta(days=7) |
|
|
|
|
|
while current_date < end_date: |
|
|
|
|
|
contracts = await self._fetch_recent_contracts( |
|
|
start_date=current_date, |
|
|
end_date=min(batch_end_date, end_date), |
|
|
limit=batch_size |
|
|
) |
|
|
|
|
|
if not contracts: |
|
|
current_date = batch_end_date |
|
|
batch_end_date += timedelta(days=7) |
|
|
continue |
|
|
|
|
|
|
|
|
suspicious_contracts = await self._pre_screen_contracts(contracts) |
|
|
|
|
|
|
|
|
if suspicious_contracts: |
|
|
investigations = await self._investigate_batch(suspicious_contracts) |
|
|
total_investigations += len(investigations) |
|
|
total_anomalies += sum( |
|
|
len(inv.get("anomalies", [])) for inv in investigations |
|
|
) |
|
|
|
|
|
total_analyzed += len(contracts) |
|
|
|
|
|
logger.info( |
|
|
"historical_batch_processed", |
|
|
date_range=f"{current_date.date()} to {batch_end_date.date()}", |
|
|
contracts=len(contracts), |
|
|
suspicious=len(suspicious_contracts) |
|
|
) |
|
|
|
|
|
|
|
|
current_date = batch_end_date |
|
|
batch_end_date += timedelta(days=7) |
|
|
|
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
|
|
duration = (datetime.utcnow() - start_time).total_seconds() |
|
|
|
|
|
result = { |
|
|
"monitoring_type": "historical_reanalysis", |
|
|
"months_analyzed": months_back, |
|
|
"contracts_analyzed": total_analyzed, |
|
|
"investigations_created": total_investigations, |
|
|
"anomalies_detected": total_anomalies, |
|
|
"duration_seconds": duration, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
logger.info("historical_reanalysis_completed", **result) |
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"historical_reanalysis_failed", |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
raise |
|
|
|
|
|
async def _fetch_recent_contracts( |
|
|
self, |
|
|
start_date: datetime, |
|
|
end_date: datetime, |
|
|
organization_codes: Optional[List[str]] = None, |
|
|
limit: int = 500 |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Fetch contracts from Portal da Transparência.""" |
|
|
try: |
|
|
filters = TransparencyAPIFilter( |
|
|
dataInicial=start_date.strftime("%d/%m/%Y"), |
|
|
dataFinal=end_date.strftime("%d/%m/%Y"), |
|
|
) |
|
|
|
|
|
|
|
|
if organization_codes: |
|
|
all_contracts = [] |
|
|
for org_code in organization_codes: |
|
|
filters.codigoOrgao = org_code |
|
|
contracts = await self.transparency_api.get_contracts( |
|
|
filters=filters, |
|
|
limit=limit // len(organization_codes) |
|
|
) |
|
|
all_contracts.extend(contracts) |
|
|
return all_contracts |
|
|
else: |
|
|
|
|
|
return await self.transparency_api.get_contracts( |
|
|
filters=filters, |
|
|
limit=limit |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning( |
|
|
"contract_fetch_failed", |
|
|
error=str(e), |
|
|
date_range=f"{start_date.date()} to {end_date.date()}" |
|
|
) |
|
|
return [] |
|
|
|
|
|
async def _pre_screen_contracts( |
|
|
self, |
|
|
contracts: List[Dict[str, Any]] |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Quick pre-screening to identify potentially suspicious contracts. |
|
|
|
|
|
This reduces load by only fully investigating high-risk contracts. |
|
|
""" |
|
|
suspicious = [] |
|
|
|
|
|
for contract in contracts: |
|
|
suspicion_score = 0 |
|
|
reasons = [] |
|
|
|
|
|
|
|
|
valor = contract.get("valorInicial") or contract.get("valorGlobal") or 0 |
|
|
if isinstance(valor, (int, float)) and valor > self.value_threshold: |
|
|
suspicion_score += 2 |
|
|
reasons.append(f"high_value:{valor}") |
|
|
|
|
|
|
|
|
modalidade = str(contract.get("modalidadeLicitacao", "")).lower() |
|
|
if "dispensa" in modalidade or "inexigibilidade" in modalidade: |
|
|
suspicion_score += 3 |
|
|
reasons.append(f"emergency_process:{modalidade}") |
|
|
|
|
|
|
|
|
num_proponentes = contract.get("numeroProponentes", 0) |
|
|
if num_proponentes == 1: |
|
|
suspicion_score += 2 |
|
|
reasons.append("single_bidder") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if suspicion_score >= 3: |
|
|
contract["_suspicion_score"] = suspicion_score |
|
|
contract["_suspicion_reasons"] = reasons |
|
|
suspicious.append(contract) |
|
|
|
|
|
return suspicious |
|
|
|
|
|
async def _investigate_batch( |
|
|
self, |
|
|
contracts: List[Dict[str, Any]] |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Investigate a batch of suspicious contracts. |
|
|
|
|
|
Creates investigation records and runs full forensic analysis. |
|
|
""" |
|
|
investigations = [] |
|
|
investigator = await self._get_investigator() |
|
|
|
|
|
for contract in contracts: |
|
|
try: |
|
|
|
|
|
investigation = await investigation_service.create( |
|
|
user_id="system_auto_monitor", |
|
|
query=f"Auto-investigation: {contract.get('objeto', 'N/A')[:100]}", |
|
|
data_source="contracts", |
|
|
filters={ |
|
|
"contract_id": contract.get("id"), |
|
|
"auto_triggered": True, |
|
|
"suspicion_score": contract.get("_suspicion_score"), |
|
|
"suspicion_reasons": contract.get("_suspicion_reasons", []) |
|
|
}, |
|
|
anomaly_types=["price", "vendor", "temporal", "payment", "duplicate"] |
|
|
) |
|
|
|
|
|
investigation_id = ( |
|
|
investigation.id if hasattr(investigation, 'id') |
|
|
else investigation['id'] |
|
|
) |
|
|
|
|
|
|
|
|
context = AgentContext( |
|
|
conversation_id=investigation_id, |
|
|
user_id="system_auto_monitor", |
|
|
session_data={ |
|
|
"auto_investigation": True, |
|
|
"contract_data": contract |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
anomalies = await investigator.investigate_anomalies( |
|
|
query=f"Analyze contract {contract.get('id')}", |
|
|
data_source="contracts", |
|
|
filters=TransparencyAPIFilter(), |
|
|
anomaly_types=["price", "vendor", "temporal", "payment"], |
|
|
context=context |
|
|
) |
|
|
|
|
|
|
|
|
if anomalies: |
|
|
await investigation_service.update_status( |
|
|
investigation_id=investigation_id, |
|
|
status="completed", |
|
|
progress=1.0, |
|
|
results=[ |
|
|
{ |
|
|
"anomaly_type": a.anomaly_type, |
|
|
"severity": a.severity, |
|
|
"confidence": a.confidence, |
|
|
"description": a.description |
|
|
} |
|
|
for a in anomalies |
|
|
], |
|
|
anomalies_found=len(anomalies) |
|
|
) |
|
|
|
|
|
investigations.append({ |
|
|
"investigation_id": investigation_id, |
|
|
"contract_id": contract.get("id"), |
|
|
"anomalies": [ |
|
|
{ |
|
|
"type": a.anomaly_type, |
|
|
"severity": a.severity, |
|
|
"confidence": a.confidence |
|
|
} |
|
|
for a in anomalies |
|
|
] |
|
|
}) |
|
|
|
|
|
logger.info( |
|
|
"auto_investigation_completed", |
|
|
investigation_id=investigation_id, |
|
|
contract_id=contract.get("id"), |
|
|
anomalies_found=len(anomalies) |
|
|
) |
|
|
else: |
|
|
|
|
|
await investigation_service.update_status( |
|
|
investigation_id=investigation_id, |
|
|
status="completed", |
|
|
progress=1.0, |
|
|
results=[], |
|
|
anomalies_found=0 |
|
|
) |
|
|
|
|
|
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"auto_investigation_failed", |
|
|
contract_id=contract.get("id"), |
|
|
error=str(e), |
|
|
exc_info=True |
|
|
) |
|
|
continue |
|
|
|
|
|
return investigations |
|
|
|
|
|
|
|
|
|
|
|
auto_investigation_service = AutoInvestigationService() |
|
|
|