🏢 Cidadão.AI Business Services Layer
📋 Overview
The Business Services Layer encapsulates the core business logic and domain operations for transparency analysis. This layer orchestrates complex workflows, coordinates between different system components, and provides high-level services that implement the platform's business requirements.
🏗️ Architecture
src/services/
├── analysis_service.py # Core data analysis orchestration
├── data_service.py # Data management and processing
├── notification_service.py # Communication and alerting
└── __init__.py # Service layer initialization
🎯 Core Services
1. AnalysisService - Data Analysis Orchestration
Comprehensive Analysis Workflows
class AnalysisService:
"""
Central service for orchestrating government data analysis
Responsibilities:
- Coordinate multi-agent analysis workflows
- Implement business logic for transparency analysis
- Manage analysis caching and optimization
- Provide high-level analysis APIs
- Ensure data quality and validation
"""
def __init__(self):
self._analysis_cache = {} # Result caching
self.agent_orchestrator = None # Multi-agent coordinator
self.ml_pipeline = None # ML processing pipeline
self.data_validator = None # Data quality validation
Advanced Analysis Methods
async def analyze_spending_patterns(self, data: List[Dict]) -> Dict:
"""
Comprehensive spending pattern analysis
Analysis Types:
- Temporal spending trends
- Seasonal pattern detection
- Organizational behavior analysis
- Vendor concentration analysis
- Budget execution efficiency
- Cross-organizational comparisons
"""
if not data:
return {"error": "No data provided for analysis"}
# Data preprocessing and validation
validated_data = await self._validate_and_clean_data(data)
# Multi-dimensional analysis
analysis_results = {
# Basic statistics
"total_items": len(validated_data),
"total_value": self._calculate_total_value(validated_data),
"average_value": self._calculate_average_value(validated_data),
# Temporal analysis
"temporal_patterns": await self._analyze_temporal_patterns(validated_data),
# Statistical analysis
"statistical_summary": await self._generate_statistical_summary(validated_data),
# Pattern recognition
"identified_patterns": await self._identify_spending_patterns(validated_data),
# Risk assessment
"risk_indicators": await self._assess_risk_indicators(validated_data),
# Compliance analysis
"compliance_status": await self._analyze_compliance(validated_data)
}
# Cache results for performance
cache_key = self._generate_cache_key(data)
self._analysis_cache[cache_key] = analysis_results
return analysis_results
async def detect_anomalies(self, data: List[Dict]) -> List[Dict]:
"""
Multi-algorithm anomaly detection
Detection Methods:
- Statistical outliers (Z-score, IQR)
- Machine learning-based detection
- Pattern deviation analysis
- Cross-reference validation
- Temporal anomaly detection
"""
if not data:
return []
anomalies = []
# Statistical anomaly detection
statistical_anomalies = await self._detect_statistical_anomalies(data)
anomalies.extend(statistical_anomalies)
# ML-based anomaly detection
if self.ml_pipeline:
ml_anomalies = await self.ml_pipeline.detect_anomalies(data)
anomalies.extend(ml_anomalies)
# Pattern-based anomaly detection
pattern_anomalies = await self._detect_pattern_anomalies(data)
anomalies.extend(pattern_anomalies)
# Consolidate and rank anomalies
consolidated_anomalies = await self._consolidate_anomalies(anomalies)
return consolidated_anomalies
async def generate_insights(self, data: List[Dict]) -> List[str]:
"""
AI-powered insight generation
Insight Categories:
- Spending efficiency insights
- Risk and compliance insights
- Trend and pattern insights
- Comparative insights
- Actionable recommendations
"""
if not data:
return ["Nenhum dado disponível para análise"]
insights = []
# Data volume insights
insights.append(f"Analisados {len(data)} registros de dados governamentais")
# Value analysis insights
total_value = self._calculate_total_value(data)
if total_value > 0:
insights.append(f"Valor total analisado: R$ {total_value:,.2f}")
avg_value = total_value / len(data)
insights.append(f"Valor médio por registro: R$ {avg_value:,.2f}")
# Temporal insights
temporal_insights = await self._generate_temporal_insights(data)
insights.extend(temporal_insights)
# Pattern insights
pattern_insights = await self._generate_pattern_insights(data)
insights.extend(pattern_insights)
# Risk insights
risk_insights = await self._generate_risk_insights(data)
insights.extend(risk_insights)
# Actionable recommendations
recommendations = await self._generate_recommendations(data)
insights.extend(recommendations)
return insights
Advanced Comparative Analysis
async def compare_periods(
self,
current_data: List[Dict],
previous_data: List[Dict]
) -> Dict:
"""
Comprehensive period-over-period comparison
Comparison Dimensions:
- Volume changes (number of transactions)
- Value changes (total and average amounts)
- Efficiency changes (value per transaction)
- Pattern changes (temporal, vendor, category)
- Risk profile changes
- Compliance trend analysis
"""
current_analysis = await self.analyze_spending_patterns(current_data)
previous_analysis = await self.analyze_spending_patterns(previous_data)
comparison = {
# Basic metrics comparison
"volume_comparison": self._compare_volumes(current_data, previous_data),
"value_comparison": self._compare_values(current_analysis, previous_analysis),
"efficiency_comparison": self._compare_efficiency(current_analysis, previous_analysis),
# Advanced comparisons
"pattern_changes": await self._compare_patterns(current_analysis, previous_analysis),
"risk_profile_changes": await self._compare_risk_profiles(current_analysis, previous_analysis),
"compliance_trends": await self._compare_compliance(current_analysis, previous_analysis),
# Statistical significance
"statistical_significance": await self._test_statistical_significance(current_data, previous_data),
# Executive summary
"executive_summary": await self._generate_comparison_summary(current_analysis, previous_analysis)
}
return comparison
async def rank_entities(
self,
data: List[Dict],
by: str = "valor",
criteria: str = "total"
) -> List[Dict]:
"""
Multi-criteria entity ranking and analysis
Ranking Criteria:
- Total spending volume
- Average transaction value
- Transaction frequency
- Risk score
- Compliance score
- Efficiency metrics
- Anomaly frequency
"""
if not data:
return []
# Group data by entity
entities = self._group_by_entity(data)
ranked_entities = []
for entity_id, entity_data in entities.items():
entity_metrics = {
"entity_id": entity_id,
"entity_name": self._get_entity_name(entity_id),
# Volume metrics
"total_transactions": len(entity_data),
"total_value": self._calculate_total_value(entity_data),
"average_value": self._calculate_average_value(entity_data),
# Performance metrics
"efficiency_score": await self._calculate_efficiency_score(entity_data),
"compliance_score": await self._calculate_compliance_score(entity_data),
"risk_score": await self._calculate_risk_score(entity_data),
# Analysis results
"anomaly_count": await self._count_anomalies(entity_data),
"pattern_stability": await self._assess_pattern_stability(entity_data),
# Derived metrics
"value_per_transaction": self._calculate_value_per_transaction(entity_data),
"transaction_frequency": self._calculate_transaction_frequency(entity_data)
}
ranked_entities.append(entity_metrics)
# Sort by specified criteria
if by == "valor":
ranked_entities.sort(key=lambda x: x["total_value"], reverse=True)
elif by == "risk":
ranked_entities.sort(key=lambda x: x["risk_score"], reverse=True)
elif by == "efficiency":
ranked_entities.sort(key=lambda x: x["efficiency_score"], reverse=True)
elif by == "anomalies":
ranked_entities.sort(key=lambda x: x["anomaly_count"], reverse=True)
return ranked_entities
2. DataService - Data Management Operations
Comprehensive Data Management
class DataService:
"""
Central data management service
Responsibilities:
- Data ingestion from multiple sources
- Data quality validation and cleaning
- Data transformation and normalization
- Data persistence and caching
- Data lifecycle management
"""
def __init__(self):
self.transparency_client = None # External API client
self.database_manager = None # Database operations
self.cache_manager = None # Caching layer
self.data_validator = None # Data quality validation
self.transformation_pipeline = None # Data transformation
async def fetch_government_data(
self,
data_type: str,
filters: Dict[str, Any] = None,
cache_ttl: int = 3600
) -> List[Dict]:
"""
Fetch data from government transparency APIs
Data Sources:
- Portal da Transparência
- IBGE statistical data
- TCU audit data
- CGU oversight data
- State and municipal portals
"""
# Check cache first
cache_key = self._generate_cache_key(data_type, filters)
cached_data = await self.cache_manager.get(cache_key)
if cached_data:
return cached_data
# Fetch fresh data
raw_data = await self.transparency_client.fetch_data(data_type, filters)
# Validate and clean data
validated_data = await self.data_validator.validate_data(raw_data)
# Transform to standard format
transformed_data = await self.transformation_pipeline.transform(validated_data)
# Cache results
await self.cache_manager.set(cache_key, transformed_data, ttl=cache_ttl)
# Persist to database
await self.database_manager.store_data(data_type, transformed_data)
return transformed_data
async def enrich_data(self, data: List[Dict]) -> List[Dict]:
"""
Enrich data with additional context and metadata
Enrichment Sources:
- Organization metadata
- Vendor company information
- Geographic information
- Legal and regulatory context
- Historical trends and benchmarks
"""
enriched_data = []
for record in data:
enriched_record = record.copy()
# Add organization context
if 'orgao' in record:
org_context = await self._get_organization_context(record['orgao'])
enriched_record['organization_context'] = org_context
# Add vendor information
if 'fornecedor' in record:
vendor_info = await self._get_vendor_information(record['fornecedor'])
enriched_record['vendor_information'] = vendor_info
# Add geographic context
if 'municipio' in record or 'uf' in record:
geo_context = await self._get_geographic_context(record)
enriched_record['geographic_context'] = geo_context
# Add temporal context
temporal_context = await self._get_temporal_context(record)
enriched_record['temporal_context'] = temporal_context
# Add regulatory context
regulatory_context = await self._get_regulatory_context(record)
enriched_record['regulatory_context'] = regulatory_context
enriched_data.append(enriched_record)
return enriched_data
async def validate_data_quality(self, data: List[Dict]) -> Dict[str, Any]:
"""
Comprehensive data quality assessment
Quality Dimensions:
- Completeness (missing values)
- Accuracy (format validation)
- Consistency (cross-field validation)
- Timeliness (data freshness)
- Validity (business rule compliance)
"""
quality_report = {
"total_records": len(data),
"validation_timestamp": datetime.utcnow(),
"quality_score": 0.0,
"issues": [],
"recommendations": []
}
# Completeness check
completeness_score = await self._assess_completeness(data)
quality_report["completeness"] = completeness_score
# Accuracy check
accuracy_score = await self._assess_accuracy(data)
quality_report["accuracy"] = accuracy_score
# Consistency check
consistency_score = await self._assess_consistency(data)
quality_report["consistency"] = consistency_score
# Timeliness check
timeliness_score = await self._assess_timeliness(data)
quality_report["timeliness"] = timeliness_score
# Calculate overall quality score
quality_report["quality_score"] = (
completeness_score + accuracy_score +
consistency_score + timeliness_score
) / 4
# Generate recommendations
quality_report["recommendations"] = await self._generate_quality_recommendations(
quality_report
)
return quality_report
3. NotificationService - Communication & Alerting
Multi-Channel Notification System
class NotificationService:
"""
Multi-channel notification and alerting service
Channels:
- Email notifications
- SMS alerts
- WebSocket real-time updates
- Webhook integrations
- In-app notifications
- Slack/Teams integration
"""
def __init__(self):
self.email_client = None # Email service
self.sms_client = None # SMS service
self.websocket_manager = None # Real-time updates
self.webhook_client = None # Webhook notifications
self.notification_templates = {} # Message templates
self.subscription_manager = None # User preferences
async def send_anomaly_alert(
self,
anomaly: Dict[str, Any],
recipients: List[str],
severity: str = "medium"
) -> bool:
"""
Send anomaly detection alerts across multiple channels
Alert Types:
- Immediate alerts for critical anomalies
- Daily digest for medium severity
- Weekly summary for low severity
- Real-time dashboard updates
"""
# Generate alert content
alert_content = await self._generate_anomaly_alert_content(anomaly, severity)
# Determine delivery channels based on severity
channels = await self._determine_alert_channels(severity)
delivery_results = {}
for channel in channels:
if channel == "email":
result = await self._send_email_alert(alert_content, recipients)
delivery_results["email"] = result
elif channel == "sms" and severity == "critical":
result = await self._send_sms_alert(alert_content, recipients)
delivery_results["sms"] = result
elif channel == "websocket":
result = await self._send_websocket_update(alert_content)
delivery_results["websocket"] = result
elif channel == "webhook":
result = await self._send_webhook_notification(alert_content)
delivery_results["webhook"] = result
# Log notification delivery
await self._log_notification_delivery(anomaly, delivery_results)
return all(delivery_results.values())
async def send_analysis_report(
self,
report: Dict[str, Any],
recipients: List[str],
format: str = "html"
) -> bool:
"""
Send formatted analysis reports
Report Formats:
- HTML email with embedded charts
- PDF attachment with detailed analysis
- JSON for API integrations
- CSV for data analysis tools
"""
# Format report based on requested format
formatted_report = await self._format_report(report, format)
# Generate report email
email_content = await self._generate_report_email(formatted_report, format)
# Send email with report
success = await self._send_email_with_attachment(
content=email_content,
recipients=recipients,
attachment=formatted_report if format == "pdf" else None
)
return success
async def setup_alert_subscription(
self,
user_id: str,
alert_types: List[str],
channels: List[str],
filters: Dict[str, Any] = None
) -> bool:
"""
Configure user alert subscriptions
Subscription Options:
- Alert types (anomalies, reports, system updates)
- Delivery channels (email, SMS, webhook)
- Severity thresholds
- Content filters
- Delivery frequency
"""
subscription = {
"user_id": user_id,
"alert_types": alert_types,
"channels": channels,
"filters": filters or {},
"created_at": datetime.utcnow(),
"active": True
}
# Store subscription preferences
success = await self.subscription_manager.create_subscription(subscription)
# Send confirmation
if success:
await self._send_subscription_confirmation(user_id, subscription)
return success
🔄 Service Integration Patterns
Service Orchestration
class ServiceOrchestrator:
"""
Central orchestrator for coordinating business services
Responsibilities:
- Service dependency management
- Workflow orchestration
- Error handling and recovery
- Performance monitoring
- Resource management
"""
def __init__(self):
self.analysis_service = AnalysisService()
self.data_service = DataService()
self.notification_service = NotificationService()
async def execute_comprehensive_analysis(
self,
investigation_request: Dict[str, Any]
) -> Dict[str, Any]:
"""
Execute end-to-end transparency analysis workflow
Workflow:
1. Data acquisition and validation
2. Data enrichment and preprocessing
3. Multi-dimensional analysis
4. Anomaly detection
5. Insight generation
6. Report creation
7. Notification delivery
"""
try:
# Step 1: Acquire and validate data
raw_data = await self.data_service.fetch_government_data(
data_type=investigation_request["data_type"],
filters=investigation_request.get("filters", {})
)
# Step 2: Enrich data with context
enriched_data = await self.data_service.enrich_data(raw_data)
# Step 3: Execute analysis
analysis_results = await self.analysis_service.analyze_spending_patterns(
enriched_data
)
# Step 4: Detect anomalies
anomalies = await self.analysis_service.detect_anomalies(enriched_data)
# Step 5: Generate insights
insights = await self.analysis_service.generate_insights(enriched_data)
# Step 6: Create comprehensive report
report = {
"investigation_id": investigation_request["id"],
"data_summary": {
"total_records": len(enriched_data),
"data_quality": await self.data_service.validate_data_quality(enriched_data)
},
"analysis_results": analysis_results,
"anomalies": anomalies,
"insights": insights,
"timestamp": datetime.utcnow()
}
# Step 7: Send notifications if anomalies found
if anomalies:
critical_anomalies = [a for a in anomalies if a.get("severity") == "critical"]
if critical_anomalies:
await self.notification_service.send_anomaly_alert(
anomaly=critical_anomalies[0],
recipients=investigation_request.get("alert_recipients", []),
severity="critical"
)
return report
except Exception as e:
# Error handling and notification
error_report = {
"investigation_id": investigation_request["id"],
"status": "error",
"error_message": str(e),
"timestamp": datetime.utcnow()
}
# Send error notification
await self.notification_service.send_error_notification(
error_report,
investigation_request.get("alert_recipients", [])
)
raise
🧪 Usage Examples
Basic Analysis Service Usage
from src.services.analysis_service import AnalysisService
# Initialize service
analysis_service = AnalysisService()
# Analyze government spending data
contracts_data = await fetch_contracts_from_api()
analysis_results = await analysis_service.analyze_spending_patterns(contracts_data)
print(f"Total analyzed: R$ {analysis_results['total_value']:,.2f}")
print(f"Anomalies found: {len(analysis_results.get('anomalies', []))}")
# Generate insights
insights = await analysis_service.generate_insights(contracts_data)
for insight in insights:
print(f"💡 {insight}")
# Compare with previous period
previous_data = await fetch_previous_period_data()
comparison = await analysis_service.compare_periods(contracts_data, previous_data)
print(f"Change: {comparison['percentage_change']:.1f}%")
Data Service Integration
from src.services.data_service import DataService
# Initialize data service
data_service = DataService()
# Fetch and enrich government data
raw_data = await data_service.fetch_government_data(
data_type="contracts",
filters={"year": 2024, "organization": "20000"}
)
enriched_data = await data_service.enrich_data(raw_data)
# Validate data quality
quality_report = await data_service.validate_data_quality(enriched_data)
print(f"Data quality score: {quality_report['quality_score']:.2f}")
Notification Service Setup
from src.services.notification_service import NotificationService
# Initialize notification service
notification_service = NotificationService()
# Setup alert subscription
await notification_service.setup_alert_subscription(
user_id="user123",
alert_types=["anomalies", "critical_findings"],
channels=["email", "webhook"],
filters={"severity": ["high", "critical"]}
)
# Send anomaly alert
anomaly = {
"type": "price_outlier",
"description": "Contract value 300% above expected range",
"confidence": 0.95,
"affected_value": 5000000.00
}
await notification_service.send_anomaly_alert(
anomaly=anomaly,
recipients=["[email protected]"],
severity="critical"
)
Service Orchestration
from src.services import ServiceOrchestrator
# Initialize orchestrator
orchestrator = ServiceOrchestrator()
# Execute comprehensive analysis
investigation_request = {
"id": "inv_001",
"data_type": "contracts",
"filters": {"year": 2024, "organization": "20000"},
"alert_recipients": ["[email protected]"]
}
report = await orchestrator.execute_comprehensive_analysis(investigation_request)
print(f"Analysis completed for investigation {report['investigation_id']}")
print(f"Found {len(report['anomalies'])} anomalies")
print(f"Generated {len(report['insights'])} insights")
🔧 Configuration & Environment
Service Configuration
# Environment variables for service configuration
SERVICE_CONFIG = {
# Analysis Service
"ANALYSIS_CACHE_TTL": 3600,
"ENABLE_ML_ANOMALY_DETECTION": True,
"ANOMALY_THRESHOLD": 0.8,
# Data Service
"DATA_FETCH_TIMEOUT": 30,
"DATA_CACHE_TTL": 1800,
"ENABLE_DATA_ENRICHMENT": True,
# Notification Service
"EMAIL_SMTP_SERVER": "smtp.gmail.com",
"SMS_API_KEY": "your_sms_api_key",
"WEBHOOK_TIMEOUT": 10,
"ENABLE_REAL_TIME_ALERTS": True
}
This business services layer provides comprehensive orchestration of transparency analysis operations, implementing sophisticated business logic while maintaining clean separation of concerns and high-level abstractions for complex government data processing workflows.