neural-thinker's picture
feat: clean HuggingFace deployment with essential files only
824bf31

🏢 Cidadão.AI Business Services Layer

📋 Overview

The Business Services Layer encapsulates the core business logic and domain operations for transparency analysis. This layer orchestrates complex workflows, coordinates between different system components, and provides high-level services that implement the platform's business requirements.

🏗️ Architecture

src/services/
├── analysis_service.py      # Core data analysis orchestration
├── data_service.py          # Data management and processing
├── notification_service.py  # Communication and alerting
└── __init__.py             # Service layer initialization

🎯 Core Services

1. AnalysisService - Data Analysis Orchestration

Comprehensive Analysis Workflows

class AnalysisService:
    """
    Central service for orchestrating government data analysis
    
    Responsibilities:
    - Coordinate multi-agent analysis workflows
    - Implement business logic for transparency analysis
    - Manage analysis caching and optimization
    - Provide high-level analysis APIs
    - Ensure data quality and validation
    """
    
    def __init__(self):
        self._analysis_cache = {}              # Result caching
        self.agent_orchestrator = None         # Multi-agent coordinator
        self.ml_pipeline = None               # ML processing pipeline
        self.data_validator = None            # Data quality validation

Advanced Analysis Methods

async def analyze_spending_patterns(self, data: List[Dict]) -> Dict:
    """
    Comprehensive spending pattern analysis
    
    Analysis Types:
    - Temporal spending trends
    - Seasonal pattern detection  
    - Organizational behavior analysis
    - Vendor concentration analysis
    - Budget execution efficiency
    - Cross-organizational comparisons
    """
    
    if not data:
        return {"error": "No data provided for analysis"}
    
    # Data preprocessing and validation
    validated_data = await self._validate_and_clean_data(data)
    
    # Multi-dimensional analysis
    analysis_results = {
        # Basic statistics
        "total_items": len(validated_data),
        "total_value": self._calculate_total_value(validated_data),
        "average_value": self._calculate_average_value(validated_data),
        
        # Temporal analysis
        "temporal_patterns": await self._analyze_temporal_patterns(validated_data),
        
        # Statistical analysis
        "statistical_summary": await self._generate_statistical_summary(validated_data),
        
        # Pattern recognition
        "identified_patterns": await self._identify_spending_patterns(validated_data),
        
        # Risk assessment
        "risk_indicators": await self._assess_risk_indicators(validated_data),
        
        # Compliance analysis
        "compliance_status": await self._analyze_compliance(validated_data)
    }
    
    # Cache results for performance
    cache_key = self._generate_cache_key(data)
    self._analysis_cache[cache_key] = analysis_results
    
    return analysis_results

async def detect_anomalies(self, data: List[Dict]) -> List[Dict]:
    """
    Multi-algorithm anomaly detection
    
    Detection Methods:
    - Statistical outliers (Z-score, IQR)
    - Machine learning-based detection
    - Pattern deviation analysis
    - Cross-reference validation
    - Temporal anomaly detection
    """
    
    if not data:
        return []
    
    anomalies = []
    
    # Statistical anomaly detection
    statistical_anomalies = await self._detect_statistical_anomalies(data)
    anomalies.extend(statistical_anomalies)
    
    # ML-based anomaly detection
    if self.ml_pipeline:
        ml_anomalies = await self.ml_pipeline.detect_anomalies(data)
        anomalies.extend(ml_anomalies)
    
    # Pattern-based anomaly detection
    pattern_anomalies = await self._detect_pattern_anomalies(data)
    anomalies.extend(pattern_anomalies)
    
    # Consolidate and rank anomalies
    consolidated_anomalies = await self._consolidate_anomalies(anomalies)
    
    return consolidated_anomalies

async def generate_insights(self, data: List[Dict]) -> List[str]:
    """
    AI-powered insight generation
    
    Insight Categories:
    - Spending efficiency insights
    - Risk and compliance insights  
    - Trend and pattern insights
    - Comparative insights
    - Actionable recommendations
    """
    
    if not data:
        return ["Nenhum dado disponível para análise"]
    
    insights = []
    
    # Data volume insights
    insights.append(f"Analisados {len(data)} registros de dados governamentais")
    
    # Value analysis insights
    total_value = self._calculate_total_value(data)
    if total_value > 0:
        insights.append(f"Valor total analisado: R$ {total_value:,.2f}")
        
        avg_value = total_value / len(data)
        insights.append(f"Valor médio por registro: R$ {avg_value:,.2f}")
    
    # Temporal insights
    temporal_insights = await self._generate_temporal_insights(data)
    insights.extend(temporal_insights)
    
    # Pattern insights
    pattern_insights = await self._generate_pattern_insights(data)
    insights.extend(pattern_insights)
    
    # Risk insights
    risk_insights = await self._generate_risk_insights(data)
    insights.extend(risk_insights)
    
    # Actionable recommendations
    recommendations = await self._generate_recommendations(data)
    insights.extend(recommendations)
    
    return insights

Advanced Comparative Analysis

async def compare_periods(
    self, 
    current_data: List[Dict], 
    previous_data: List[Dict]
) -> Dict:
    """
    Comprehensive period-over-period comparison
    
    Comparison Dimensions:
    - Volume changes (number of transactions)
    - Value changes (total and average amounts)
    - Efficiency changes (value per transaction)
    - Pattern changes (temporal, vendor, category)
    - Risk profile changes
    - Compliance trend analysis
    """
    
    current_analysis = await self.analyze_spending_patterns(current_data)
    previous_analysis = await self.analyze_spending_patterns(previous_data)
    
    comparison = {
        # Basic metrics comparison
        "volume_comparison": self._compare_volumes(current_data, previous_data),
        "value_comparison": self._compare_values(current_analysis, previous_analysis),
        "efficiency_comparison": self._compare_efficiency(current_analysis, previous_analysis),
        
        # Advanced comparisons
        "pattern_changes": await self._compare_patterns(current_analysis, previous_analysis),
        "risk_profile_changes": await self._compare_risk_profiles(current_analysis, previous_analysis),
        "compliance_trends": await self._compare_compliance(current_analysis, previous_analysis),
        
        # Statistical significance
        "statistical_significance": await self._test_statistical_significance(current_data, previous_data),
        
        # Executive summary
        "executive_summary": await self._generate_comparison_summary(current_analysis, previous_analysis)
    }
    
    return comparison

async def rank_entities(
    self, 
    data: List[Dict], 
    by: str = "valor",
    criteria: str = "total"
) -> List[Dict]:
    """
    Multi-criteria entity ranking and analysis
    
    Ranking Criteria:
    - Total spending volume
    - Average transaction value
    - Transaction frequency
    - Risk score
    - Compliance score
    - Efficiency metrics
    - Anomaly frequency
    """
    
    if not data:
        return []
    
    # Group data by entity
    entities = self._group_by_entity(data)
    
    ranked_entities = []
    
    for entity_id, entity_data in entities.items():
        entity_metrics = {
            "entity_id": entity_id,
            "entity_name": self._get_entity_name(entity_id),
            
            # Volume metrics
            "total_transactions": len(entity_data),
            "total_value": self._calculate_total_value(entity_data),
            "average_value": self._calculate_average_value(entity_data),
            
            # Performance metrics
            "efficiency_score": await self._calculate_efficiency_score(entity_data),
            "compliance_score": await self._calculate_compliance_score(entity_data),
            "risk_score": await self._calculate_risk_score(entity_data),
            
            # Analysis results
            "anomaly_count": await self._count_anomalies(entity_data),
            "pattern_stability": await self._assess_pattern_stability(entity_data),
            
            # Derived metrics
            "value_per_transaction": self._calculate_value_per_transaction(entity_data),
            "transaction_frequency": self._calculate_transaction_frequency(entity_data)
        }
        
        ranked_entities.append(entity_metrics)
    
    # Sort by specified criteria
    if by == "valor":
        ranked_entities.sort(key=lambda x: x["total_value"], reverse=True)
    elif by == "risk":
        ranked_entities.sort(key=lambda x: x["risk_score"], reverse=True)
    elif by == "efficiency":
        ranked_entities.sort(key=lambda x: x["efficiency_score"], reverse=True)
    elif by == "anomalies":
        ranked_entities.sort(key=lambda x: x["anomaly_count"], reverse=True)
    
    return ranked_entities

2. DataService - Data Management Operations

Comprehensive Data Management

class DataService:
    """
    Central data management service
    
    Responsibilities:
    - Data ingestion from multiple sources
    - Data quality validation and cleaning
    - Data transformation and normalization
    - Data persistence and caching
    - Data lifecycle management
    """
    
    def __init__(self):
        self.transparency_client = None        # External API client
        self.database_manager = None          # Database operations
        self.cache_manager = None             # Caching layer
        self.data_validator = None            # Data quality validation
        self.transformation_pipeline = None   # Data transformation
    
    async def fetch_government_data(
        self,
        data_type: str,
        filters: Dict[str, Any] = None,
        cache_ttl: int = 3600
    ) -> List[Dict]:
        """
        Fetch data from government transparency APIs
        
        Data Sources:
        - Portal da Transparência
        - IBGE statistical data
        - TCU audit data
        - CGU oversight data
        - State and municipal portals
        """
        
        # Check cache first
        cache_key = self._generate_cache_key(data_type, filters)
        cached_data = await self.cache_manager.get(cache_key)
        
        if cached_data:
            return cached_data
        
        # Fetch fresh data
        raw_data = await self.transparency_client.fetch_data(data_type, filters)
        
        # Validate and clean data
        validated_data = await self.data_validator.validate_data(raw_data)
        
        # Transform to standard format
        transformed_data = await self.transformation_pipeline.transform(validated_data)
        
        # Cache results
        await self.cache_manager.set(cache_key, transformed_data, ttl=cache_ttl)
        
        # Persist to database
        await self.database_manager.store_data(data_type, transformed_data)
        
        return transformed_data
    
    async def enrich_data(self, data: List[Dict]) -> List[Dict]:
        """
        Enrich data with additional context and metadata
        
        Enrichment Sources:
        - Organization metadata
        - Vendor company information
        - Geographic information
        - Legal and regulatory context
        - Historical trends and benchmarks
        """
        
        enriched_data = []
        
        for record in data:
            enriched_record = record.copy()
            
            # Add organization context
            if 'orgao' in record:
                org_context = await self._get_organization_context(record['orgao'])
                enriched_record['organization_context'] = org_context
            
            # Add vendor information
            if 'fornecedor' in record:
                vendor_info = await self._get_vendor_information(record['fornecedor'])
                enriched_record['vendor_information'] = vendor_info
            
            # Add geographic context
            if 'municipio' in record or 'uf' in record:
                geo_context = await self._get_geographic_context(record)
                enriched_record['geographic_context'] = geo_context
            
            # Add temporal context
            temporal_context = await self._get_temporal_context(record)
            enriched_record['temporal_context'] = temporal_context
            
            # Add regulatory context
            regulatory_context = await self._get_regulatory_context(record)  
            enriched_record['regulatory_context'] = regulatory_context
            
            enriched_data.append(enriched_record)
        
        return enriched_data
    
    async def validate_data_quality(self, data: List[Dict]) -> Dict[str, Any]:
        """
        Comprehensive data quality assessment
        
        Quality Dimensions:
        - Completeness (missing values)
        - Accuracy (format validation)
        - Consistency (cross-field validation)
        - Timeliness (data freshness)
        - Validity (business rule compliance)
        """
        
        quality_report = {
            "total_records": len(data),
            "validation_timestamp": datetime.utcnow(),
            "quality_score": 0.0,
            "issues": [],
            "recommendations": []
        }
        
        # Completeness check
        completeness_score = await self._assess_completeness(data)
        quality_report["completeness"] = completeness_score
        
        # Accuracy check  
        accuracy_score = await self._assess_accuracy(data)
        quality_report["accuracy"] = accuracy_score
        
        # Consistency check
        consistency_score = await self._assess_consistency(data)
        quality_report["consistency"] = consistency_score
        
        # Timeliness check
        timeliness_score = await self._assess_timeliness(data)
        quality_report["timeliness"] = timeliness_score
        
        # Calculate overall quality score
        quality_report["quality_score"] = (
            completeness_score + accuracy_score + 
            consistency_score + timeliness_score
        ) / 4
        
        # Generate recommendations
        quality_report["recommendations"] = await self._generate_quality_recommendations(
            quality_report
        )
        
        return quality_report

3. NotificationService - Communication & Alerting

Multi-Channel Notification System

class NotificationService:
    """
    Multi-channel notification and alerting service
    
    Channels:
    - Email notifications
    - SMS alerts
    - WebSocket real-time updates
    - Webhook integrations
    - In-app notifications
    - Slack/Teams integration
    """
    
    def __init__(self):
        self.email_client = None              # Email service
        self.sms_client = None               # SMS service  
        self.websocket_manager = None        # Real-time updates
        self.webhook_client = None           # Webhook notifications
        self.notification_templates = {}     # Message templates
        self.subscription_manager = None     # User preferences
    
    async def send_anomaly_alert(
        self,
        anomaly: Dict[str, Any],
        recipients: List[str],
        severity: str = "medium"
    ) -> bool:
        """
        Send anomaly detection alerts across multiple channels
        
        Alert Types:
        - Immediate alerts for critical anomalies
        - Daily digest for medium severity
        - Weekly summary for low severity
        - Real-time dashboard updates
        """
        
        # Generate alert content
        alert_content = await self._generate_anomaly_alert_content(anomaly, severity)
        
        # Determine delivery channels based on severity
        channels = await self._determine_alert_channels(severity)
        
        delivery_results = {}
        
        for channel in channels:
            if channel == "email":
                result = await self._send_email_alert(alert_content, recipients)
                delivery_results["email"] = result
                
            elif channel == "sms" and severity == "critical":
                result = await self._send_sms_alert(alert_content, recipients)
                delivery_results["sms"] = result
                
            elif channel == "websocket":
                result = await self._send_websocket_update(alert_content)
                delivery_results["websocket"] = result
                
            elif channel == "webhook":
                result = await self._send_webhook_notification(alert_content)
                delivery_results["webhook"] = result
        
        # Log notification delivery
        await self._log_notification_delivery(anomaly, delivery_results)
        
        return all(delivery_results.values())
    
    async def send_analysis_report(
        self,
        report: Dict[str, Any],
        recipients: List[str],
        format: str = "html"
    ) -> bool:
        """
        Send formatted analysis reports
        
        Report Formats:
        - HTML email with embedded charts
        - PDF attachment with detailed analysis
        - JSON for API integrations
        - CSV for data analysis tools
        """
        
        # Format report based on requested format
        formatted_report = await self._format_report(report, format)
        
        # Generate report email
        email_content = await self._generate_report_email(formatted_report, format)
        
        # Send email with report
        success = await self._send_email_with_attachment(
            content=email_content,
            recipients=recipients,
            attachment=formatted_report if format == "pdf" else None
        )
        
        return success
    
    async def setup_alert_subscription(
        self,
        user_id: str,
        alert_types: List[str],
        channels: List[str],
        filters: Dict[str, Any] = None
    ) -> bool:
        """
        Configure user alert subscriptions
        
        Subscription Options:
        - Alert types (anomalies, reports, system updates)
        - Delivery channels (email, SMS, webhook)
        - Severity thresholds
        - Content filters
        - Delivery frequency
        """
        
        subscription = {
            "user_id": user_id,
            "alert_types": alert_types,
            "channels": channels,
            "filters": filters or {},
            "created_at": datetime.utcnow(),
            "active": True
        }
        
        # Store subscription preferences
        success = await self.subscription_manager.create_subscription(subscription)
        
        # Send confirmation
        if success:
            await self._send_subscription_confirmation(user_id, subscription)
        
        return success

🔄 Service Integration Patterns

Service Orchestration

class ServiceOrchestrator:
    """
    Central orchestrator for coordinating business services
    
    Responsibilities:
    - Service dependency management
    - Workflow orchestration
    - Error handling and recovery
    - Performance monitoring
    - Resource management
    """
    
    def __init__(self):
        self.analysis_service = AnalysisService()
        self.data_service = DataService()
        self.notification_service = NotificationService()
        
    async def execute_comprehensive_analysis(
        self,
        investigation_request: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Execute end-to-end transparency analysis workflow
        
        Workflow:
        1. Data acquisition and validation
        2. Data enrichment and preprocessing
        3. Multi-dimensional analysis
        4. Anomaly detection
        5. Insight generation
        6. Report creation
        7. Notification delivery
        """
        
        try:
            # Step 1: Acquire and validate data
            raw_data = await self.data_service.fetch_government_data(
                data_type=investigation_request["data_type"],
                filters=investigation_request.get("filters", {})
            )
            
            # Step 2: Enrich data with context
            enriched_data = await self.data_service.enrich_data(raw_data)
            
            # Step 3: Execute analysis
            analysis_results = await self.analysis_service.analyze_spending_patterns(
                enriched_data
            )
            
            # Step 4: Detect anomalies
            anomalies = await self.analysis_service.detect_anomalies(enriched_data)
            
            # Step 5: Generate insights
            insights = await self.analysis_service.generate_insights(enriched_data)
            
            # Step 6: Create comprehensive report
            report = {
                "investigation_id": investigation_request["id"],
                "data_summary": {
                    "total_records": len(enriched_data),
                    "data_quality": await self.data_service.validate_data_quality(enriched_data)
                },
                "analysis_results": analysis_results,
                "anomalies": anomalies,
                "insights": insights,
                "timestamp": datetime.utcnow()
            }
            
            # Step 7: Send notifications if anomalies found
            if anomalies:
                critical_anomalies = [a for a in anomalies if a.get("severity") == "critical"]
                if critical_anomalies:
                    await self.notification_service.send_anomaly_alert(
                        anomaly=critical_anomalies[0],
                        recipients=investigation_request.get("alert_recipients", []),
                        severity="critical"
                    )
            
            return report
            
        except Exception as e:
            # Error handling and notification
            error_report = {
                "investigation_id": investigation_request["id"],
                "status": "error",
                "error_message": str(e),
                "timestamp": datetime.utcnow()
            }
            
            # Send error notification
            await self.notification_service.send_error_notification(
                error_report,
                investigation_request.get("alert_recipients", [])
            )
            
            raise

🧪 Usage Examples

Basic Analysis Service Usage

from src.services.analysis_service import AnalysisService

# Initialize service
analysis_service = AnalysisService()

# Analyze government spending data
contracts_data = await fetch_contracts_from_api()
analysis_results = await analysis_service.analyze_spending_patterns(contracts_data)

print(f"Total analyzed: R$ {analysis_results['total_value']:,.2f}")
print(f"Anomalies found: {len(analysis_results.get('anomalies', []))}")

# Generate insights
insights = await analysis_service.generate_insights(contracts_data)
for insight in insights:
    print(f"💡 {insight}")

# Compare with previous period
previous_data = await fetch_previous_period_data()
comparison = await analysis_service.compare_periods(contracts_data, previous_data)
print(f"Change: {comparison['percentage_change']:.1f}%")

Data Service Integration

from src.services.data_service import DataService

# Initialize data service
data_service = DataService()

# Fetch and enrich government data
raw_data = await data_service.fetch_government_data(
    data_type="contracts",
    filters={"year": 2024, "organization": "20000"}
)

enriched_data = await data_service.enrich_data(raw_data)

# Validate data quality
quality_report = await data_service.validate_data_quality(enriched_data)
print(f"Data quality score: {quality_report['quality_score']:.2f}")

Notification Service Setup

from src.services.notification_service import NotificationService

# Initialize notification service
notification_service = NotificationService()

# Setup alert subscription
await notification_service.setup_alert_subscription(
    user_id="user123",
    alert_types=["anomalies", "critical_findings"],
    channels=["email", "webhook"],
    filters={"severity": ["high", "critical"]}
)

# Send anomaly alert
anomaly = {
    "type": "price_outlier",
    "description": "Contract value 300% above expected range",
    "confidence": 0.95,
    "affected_value": 5000000.00
}

await notification_service.send_anomaly_alert(
    anomaly=anomaly,
    recipients=["[email protected]"],
    severity="critical"
)

Service Orchestration

from src.services import ServiceOrchestrator

# Initialize orchestrator
orchestrator = ServiceOrchestrator()

# Execute comprehensive analysis
investigation_request = {
    "id": "inv_001",
    "data_type": "contracts",
    "filters": {"year": 2024, "organization": "20000"},
    "alert_recipients": ["[email protected]"]
}

report = await orchestrator.execute_comprehensive_analysis(investigation_request)

print(f"Analysis completed for investigation {report['investigation_id']}")
print(f"Found {len(report['anomalies'])} anomalies")
print(f"Generated {len(report['insights'])} insights")

🔧 Configuration & Environment

Service Configuration

# Environment variables for service configuration
SERVICE_CONFIG = {
    # Analysis Service
    "ANALYSIS_CACHE_TTL": 3600,
    "ENABLE_ML_ANOMALY_DETECTION": True,
    "ANOMALY_THRESHOLD": 0.8,
    
    # Data Service  
    "DATA_FETCH_TIMEOUT": 30,
    "DATA_CACHE_TTL": 1800,
    "ENABLE_DATA_ENRICHMENT": True,
    
    # Notification Service
    "EMAIL_SMTP_SERVER": "smtp.gmail.com",
    "SMS_API_KEY": "your_sms_api_key",
    "WEBHOOK_TIMEOUT": 10,
    "ENABLE_REAL_TIME_ALERTS": True
}

This business services layer provides comprehensive orchestration of transparency analysis operations, implementing sophisticated business logic while maintaining clean separation of concerns and high-level abstractions for complex government data processing workflows.