#!/usr/bin/env python3 """ Cidadão.AI Backend - Expanded Version with Multiple Data Sources Supports: Contracts, Servants, Expenses, Biddings, and more """ import asyncio import logging import os import sys import time import traceback import hashlib from contextlib import asynccontextmanager from typing import Any, Dict, List, Optional, Union from datetime import datetime, timedelta from enum import Enum import uvicorn from fastapi import FastAPI, HTTPException, status, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # ==================== DATA MODELS ==================== class DataSourceType(str, Enum): """Types of data sources available.""" CONTRACTS = "contratos" SERVANTS = "servidores" EXPENSES = "despesas" BIDDINGS = "licitacoes" AGREEMENTS = "convenios" SANCTIONS = "empresas-sancionadas" class HealthResponse(BaseModel): """Health check response model.""" status: str = "healthy" version: str = "2.0.0" agents: Dict[str, str] = Field(default_factory=lambda: {"zumbi": "active"}) data_sources: List[str] = Field(default_factory=lambda: [e.value for e in DataSourceType]) uptime: str = "operational" class UniversalSearchRequest(BaseModel): """Universal search request for any data type.""" query: str = Field(..., description="Search query") data_source: DataSourceType = Field(..., description="Type of data to search") filters: Dict[str, Any] = Field(default_factory=dict, description="Additional filters") max_results: int = Field(default=100, ge=1, le=500, description="Maximum results") class ServantData(BaseModel): """Servant/Employee data model.""" nome: str cpf_masked: str matricula: str orgao: str cargo: str funcao: Optional[str] = None remuneracao: Dict[str, float] mes_ano_referencia: str class ContractData(BaseModel): """Contract data model.""" id: str numero: str objeto: str valor: float fornecedor: Dict[str, str] orgao: str data_assinatura: str vigencia: Dict[str, str] modalidade: Optional[str] = None class ExpenseData(BaseModel): """Expense data model.""" id: str descricao: str valor: float favorecido: Dict[str, str] orgao: str data: str programa: Optional[str] = None acao: Optional[str] = None class UniversalSearchResponse(BaseModel): """Universal search response.""" status: str data_source: str query: str results: List[Union[ServantData, ContractData, ExpenseData, Dict[str, Any]]] total_found: int anomalies_detected: int confidence_score: float processing_time_ms: int metadata: Dict[str, Any] = Field(default_factory=dict) # ==================== CACHE ==================== class SimpleCache: """In-memory cache for API responses with TTL.""" def __init__(self): self._cache: Dict[str, Dict] = {} self._ttl_cache: Dict[str, datetime] = {} self.default_ttl = 3600 # 1 hour def _generate_key(self, **kwargs) -> str: """Generate cache key from parameters.""" key_string = "&".join([f"{k}={v}" for k, v in sorted(kwargs.items())]) return hashlib.md5(key_string.encode()).hexdigest() def get(self, **kwargs) -> Optional[Dict]: """Get cached value if not expired.""" key = self._generate_key(**kwargs) if key not in self._cache: return None if key in self._ttl_cache: if datetime.now() > self._ttl_cache[key]: del self._cache[key] del self._ttl_cache[key] return None return self._cache[key] def set(self, value: Dict, ttl_seconds: int = None, **kwargs) -> None: """Set cached value with TTL.""" key = self._generate_key(**kwargs) self._cache[key] = value ttl = ttl_seconds or self.default_ttl self._ttl_cache[key] = datetime.now() + timedelta(seconds=ttl) # Global cache instance api_cache = SimpleCache() # ==================== ENHANCED ZUMBI AGENT ==================== class EnhancedZumbiAgent: """Enhanced Zumbi agent that can investigate multiple data sources.""" def __init__(self): self.name = "Zumbi dos Palmares" self.role = "Universal Investigator" self.specialty = "Multi-source anomaly detection" logger.info(f"🏹 {self.name} - Enhanced {self.role} initialized") async def investigate_universal(self, request: UniversalSearchRequest) -> UniversalSearchResponse: """Investigate any data source.""" import os import numpy as np from collections import defaultdict start_time = time.time() try: # Get API key api_key = os.getenv("TRANSPARENCY_API_KEY") if not api_key: logger.warning("⚠️ No API key, using demo data") return await self._get_demo_data(request, start_time) # Route to appropriate handler if request.data_source == DataSourceType.SERVANTS: return await self._search_servants(request, api_key, start_time) elif request.data_source == DataSourceType.CONTRACTS: return await self._search_contracts(request, api_key, start_time) elif request.data_source == DataSourceType.EXPENSES: return await self._search_expenses(request, api_key, start_time) elif request.data_source == DataSourceType.BIDDINGS: return await self._search_biddings(request, api_key, start_time) else: return await self._search_generic(request, api_key, start_time) except Exception as e: logger.error(f"Investigation error: {str(e)}") return UniversalSearchResponse( status="error", data_source=request.data_source.value, query=request.query, results=[], total_found=0, anomalies_detected=0, confidence_score=0.0, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"error": str(e)} ) async def _search_servants(self, request: UniversalSearchRequest, api_key: str, start_time: float) -> UniversalSearchResponse: """Search for government servants.""" import httpx # Check cache first cache_key = f"servants_{request.query}_{request.max_results}" cached = api_cache.get(source=cache_key) if cached: logger.info("📦 Using cached servants data") return cached results = [] anomalies = 0 async with httpx.AsyncClient(timeout=30.0) as client: url = "https://api.portaldatransparencia.gov.br/api-de-dados/servidores" headers = { "chave-api-dados": api_key, "Accept": "application/json" } params = { "nome": request.query.upper(), "pagina": 1, "tamanhoPagina": min(request.max_results, 50) } # Add filters from request if "orgao" in request.filters: params["orgao"] = request.filters["orgao"] if "funcao" in request.filters: params["funcao"] = request.filters["funcao"] response = await client.get(url, headers=headers, params=params) if response.status_code == 200: data = response.json() # Process servants for item in data: servant_info = item.get("servidor", {}) org_info = item.get("unidadeOrganizacional", {}) # Extract salary info salary = item.get("remuneracaoBasicaBruta", 0) total = item.get("remuneracaoAposDeducoes", 0) # Detect anomalies (e.g., very high salaries) if salary > 40000: # Above R$ 40k is unusual anomalies += 1 servant = ServantData( nome=servant_info.get("nome", "N/A"), cpf_masked=servant_info.get("cpf", "***.***.***-**"), matricula=servant_info.get("matricula", "N/A"), orgao=org_info.get("nomeUnidade", "N/A"), cargo=item.get("cargo", {}).get("descricao", "N/A"), funcao=item.get("funcao", {}).get("descricao"), remuneracao={ "basica": salary, "total_liquido": total, "gratificacoes": item.get("gratificacoes", 0), "auxilios": item.get("auxilios", 0) }, mes_ano_referencia=f"{item.get('mesReferencia', 'N/A')}/{item.get('anoReferencia', 'N/A')}" ) results.append(servant.dict()) response_data = UniversalSearchResponse( status="success", data_source=request.data_source.value, query=request.query, results=results, total_found=len(results), anomalies_detected=anomalies, confidence_score=0.95, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"source": "real_api", "anomaly_threshold": 40000} ) # Cache the response api_cache.set(response_data.dict(), source=cache_key) return response_data else: raise HTTPException(status_code=response.status_code, detail="API request failed") async def _search_contracts(self, request: UniversalSearchRequest, api_key: str, start_time: float) -> UniversalSearchResponse: """Search for contracts with anomaly detection.""" import httpx import numpy as np results = [] anomalies = 0 async with httpx.AsyncClient(timeout=30.0) as client: # Search multiple organizations org_codes = request.filters.get("orgaos", ["26000", "25000", "44000"]) all_contracts = [] for org_code in org_codes[:3]: # Limit to 3 orgs url = "https://api.portaldatransparencia.gov.br/api-de-dados/contratos" headers = { "chave-api-dados": api_key, "Accept": "application/json" } params = { "codigoOrgao": org_code, "ano": request.filters.get("ano", 2024), "tamanhoPagina": 50 } # Add search term if provided if request.query and request.query.lower() != "todos": params["descricao"] = request.query response = await client.get(url, headers=headers, params=params) if response.status_code == 200: contracts = response.json() all_contracts.extend(contracts) # Analyze contracts for anomalies if all_contracts: values = [c.get("valorInicial", 0) for c in all_contracts if c.get("valorInicial", 0) > 0] if len(values) > 3: mean_val = np.mean(values) std_val = np.std(values) for contract in all_contracts[:request.max_results]: valor = contract.get("valorInicial", 0) z_score = abs((valor - mean_val) / std_val) if std_val > 0 else 0 # Flag as anomaly if z-score > 1.5 is_anomaly = z_score > 1.5 if is_anomaly: anomalies += 1 contract_data = { "id": contract.get("id", "N/A"), "numero": contract.get("numero", "N/A"), "objeto": contract.get("objeto", "N/A")[:200], "valor": valor, "fornecedor": { "nome": contract.get("nomeFornecedor", "N/A"), "cnpj": contract.get("cnpjFornecedor", "N/A") }, "orgao": contract.get("nomeOrgao", org_code), "data_assinatura": contract.get("dataAssinatura", "N/A"), "vigencia": { "inicio": contract.get("dataInicioVigencia", "N/A"), "fim": contract.get("dataFimVigencia", "N/A") }, "modalidade": contract.get("modalidadeCompra", "N/A"), "_anomaly": is_anomaly, "_z_score": z_score } results.append(contract_data) return UniversalSearchResponse( status="success", data_source=request.data_source.value, query=request.query, results=results, total_found=len(results), anomalies_detected=anomalies, confidence_score=0.87, processing_time_ms=int((time.time() - start_time) * 1000), metadata={ "organizations_searched": org_codes[:3], "anomaly_method": "z_score", "threshold": 1.5 } ) async def _search_expenses(self, request: UniversalSearchRequest, api_key: str, start_time: float) -> UniversalSearchResponse: """Search for government expenses.""" import httpx results = [] anomalies = 0 async with httpx.AsyncClient(timeout=30.0) as client: url = "https://api.portaldatransparencia.gov.br/api-de-dados/despesas" headers = { "chave-api-dados": api_key, "Accept": "application/json" } params = { "ano": request.filters.get("ano", 2024), "mes": request.filters.get("mes", 12), "pagina": 1, "tamanhoPagina": min(request.max_results, 50) } if "orgao" in request.filters: params["orgao"] = request.filters["orgao"] response = await client.get(url, headers=headers, params=params) if response.status_code == 200: data = response.json() for expense in data: valor = expense.get("valor", 0) # Simple anomaly detection for high values if valor > 1000000: # Above 1M anomalies += 1 expense_data = ExpenseData( id=expense.get("id", "N/A"), descricao=expense.get("descricao", "N/A"), valor=valor, favorecido={ "nome": expense.get("nomeFavorecido", "N/A"), "codigo": expense.get("codigoFavorecido", "N/A") }, orgao=expense.get("nomeOrgao", "N/A"), data=expense.get("data", "N/A"), programa=expense.get("nomePrograma"), acao=expense.get("nomeAcao") ) results.append(expense_data.dict()) return UniversalSearchResponse( status="success", data_source=request.data_source.value, query=request.query, results=results, total_found=len(results), anomalies_detected=anomalies, confidence_score=0.85, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"high_value_threshold": 1000000} ) else: raise HTTPException(status_code=response.status_code, detail="API request failed") async def _search_biddings(self, request: UniversalSearchRequest, api_key: str, start_time: float) -> UniversalSearchResponse: """Search for biddings/licitações.""" # Implementation similar to contracts # For brevity, returning a simplified response return UniversalSearchResponse( status="success", data_source=request.data_source.value, query=request.query, results=[], total_found=0, anomalies_detected=0, confidence_score=0.8, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"note": "Biddings endpoint to be implemented"} ) async def _search_generic(self, request: UniversalSearchRequest, api_key: str, start_time: float) -> UniversalSearchResponse: """Generic search for other data types.""" return UniversalSearchResponse( status="success", data_source=request.data_source.value, query=request.query, results=[], total_found=0, anomalies_detected=0, confidence_score=0.7, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"note": f"Generic handler for {request.data_source.value}"} ) async def _get_demo_data(self, request: UniversalSearchRequest, start_time: float) -> UniversalSearchResponse: """Return demo data when no API key is available.""" demo_results = [] if request.data_source == DataSourceType.SERVANTS: demo_results = [{ "nome": "MARIA DA SILVA", "cpf_masked": "***.***.***-**", "matricula": "1234567", "orgao": "MINISTERIO DA SAUDE", "cargo": "ANALISTA", "funcao": "ANALISTA TECNICO", "remuneracao": { "basica": 8500.00, "total_liquido": 9876.54, "gratificacoes": 2000.00, "auxilios": 458.00 }, "mes_ano_referencia": "12/2024" }] elif request.data_source == DataSourceType.CONTRACTS: demo_results = [{ "id": "demo-001", "numero": "2024/001", "objeto": "Contrato demonstrativo para testes", "valor": 150000.00, "fornecedor": { "nome": "EMPRESA DEMO LTDA", "cnpj": "00.000.000/0001-00" }, "orgao": "ORGAO DEMONSTRATIVO", "data_assinatura": "01/01/2024", "vigencia": { "inicio": "01/01/2024", "fim": "31/12/2024" }, "modalidade": "Pregão Eletrônico", "_anomaly": False, "_z_score": 0.5 }] return UniversalSearchResponse( status="demo", data_source=request.data_source.value, query=request.query, results=demo_results, total_found=len(demo_results), anomalies_detected=0, confidence_score=0.5, processing_time_ms=int((time.time() - start_time) * 1000), metadata={"mode": "demo", "message": "Configure TRANSPARENCY_API_KEY for real data"} ) # ==================== FASTAPI APP ==================== # Create agent instance enhanced_zumbi = EnhancedZumbiAgent() # Lifespan context manager @asynccontextmanager async def lifespan(app: FastAPI): logger.info("🏛️ Cidadão.AI Enhanced Backend starting up...") logger.info("🏹 Enhanced Zumbi agent ready for multi-source investigations") yield logger.info("👋 Cidadão.AI Enhanced Backend shutting down...") # Create FastAPI app app = FastAPI( title="Cidadão.AI Enhanced Backend", description="Multi-source government transparency analysis with AI", version="2.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Add compression middleware for better performance from src.api.middleware.compression import add_compression_middleware add_compression_middleware( app, minimum_size=1024, # Compress responses larger than 1KB gzip_level=6, # Good balance of speed vs compression brotli_quality=4, # Fast brotli compression exclude_paths={"/health", "/metrics", "/health/metrics"} ) # ==================== ENDPOINTS ==================== @app.get("/", response_model=HealthResponse) async def root(): """Root endpoint with system status.""" return HealthResponse() @app.get("/health", response_model=HealthResponse) async def health_check(): """Health check endpoint.""" return HealthResponse() @app.post("/api/investigate", response_model=UniversalSearchResponse) async def investigate_universal(request: UniversalSearchRequest): """ Universal investigation endpoint for any data source. Example queries: - Servants: {"query": "João Silva", "data_source": "servidores"} - Contracts: {"query": "informática", "data_source": "contratos"} - Expenses: {"query": "todos", "data_source": "despesas", "filters": {"mes": 12}} """ try: result = await enhanced_zumbi.investigate_universal(request) return result except Exception as e: logger.error(f"Investigation error: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Investigation failed: {str(e)}" ) @app.get("/api/data-sources") async def list_data_sources(): """List all available data sources.""" return { "sources": [ { "id": ds.value, "name": ds.name, "description": { DataSourceType.CONTRACTS: "Government contracts and procurements", DataSourceType.SERVANTS: "Public servants and their salaries", DataSourceType.EXPENSES: "Government expenses and payments", DataSourceType.BIDDINGS: "Public biddings and auctions", DataSourceType.AGREEMENTS: "Government agreements and partnerships", DataSourceType.SANCTIONS: "Sanctioned companies" }.get(ds, "Data source") } for ds in DataSourceType ] } @app.get("/api/search/servants") async def search_servants_quick( nome: str = Query(..., description="Nome do servidor"), orgao: Optional[str] = Query(None, description="Código do órgão"), limit: int = Query(10, ge=1, le=50, description="Limite de resultados") ): """Quick endpoint to search servants by name.""" request = UniversalSearchRequest( query=nome, data_source=DataSourceType.SERVANTS, filters={"orgao": orgao} if orgao else {}, max_results=limit ) return await investigate_universal(request) @app.get("/api/search/contracts") async def search_contracts_quick( query: str = Query("todos", description="Termo de busca"), orgao: Optional[str] = Query(None, description="Código do órgão"), ano: int = Query(2024, description="Ano"), limit: int = Query(50, ge=1, le=100, description="Limite de resultados") ): """Quick endpoint to search contracts.""" request = UniversalSearchRequest( query=query, data_source=DataSourceType.CONTRACTS, filters={"orgaos": [orgao] if orgao else ["26000", "25000"], "ano": ano}, max_results=limit ) return await investigate_universal(request) @app.get("/api/cache/stats") async def cache_stats(): """Get cache statistics.""" return { "cache_size": len(api_cache._cache), "active_entries": len([k for k, v in api_cache._ttl_cache.items() if v > datetime.now()]), "ttl_seconds": api_cache.default_ttl } # Debug endpoint for Drummond issue @app.get("/debug/drummond-status") async def debug_drummond_status(): """Debug endpoint to check Drummond agent status.""" import traceback result = { "python_version": sys.version, "checks": {} } # Check if we can import CommunicationAgent try: from src.agents.drummond import CommunicationAgent abstract_methods = getattr(CommunicationAgent, '__abstractmethods__', set()) result["checks"]["import"] = { "status": "success", "abstract_methods": list(abstract_methods) if abstract_methods else "none", "has_shutdown": hasattr(CommunicationAgent, 'shutdown'), "has_initialize": hasattr(CommunicationAgent, 'initialize'), "has_process": hasattr(CommunicationAgent, 'process') } # Try to instantiate try: agent = CommunicationAgent() result["checks"]["instantiation"] = {"status": "success", "agent_name": agent.name} except Exception as e: result["checks"]["instantiation"] = { "status": "failed", "error": str(e), "error_type": type(e).__name__ } except Exception as e: result["checks"]["import"] = { "status": "failed", "error": str(e), "traceback": traceback.format_exc()[:500] # Limit traceback size } return result if __name__ == "__main__": port = int(os.getenv("PORT", 7860)) logger.info(f"🚀 Starting Enhanced Cidadão.AI Backend on port {port}") # Disable host header validation for HuggingFace Spaces uvicorn.run( app, host="0.0.0.0", port=port, forwarded_allow_ips="*", # Allow all proxy IPs proxy_headers=True # Trust proxy headers )