anderson-ufrj commited on
Commit
3c8ce93
·
1 Parent(s): 36a83c3

feat(services): add Supabase-backed investigation service

Browse files

Implement InvestigationServiceSupabase adapter that uses Supabase
as persistent storage backend for investigation lifecycle management.

Features:
- Investigation CRUD operations via Supabase
- Progress tracking with real-time updates
- Agent system integration
- Multi-agent investigation execution
- Lazy initialization pattern
- Complete investigation lifecycle support

src/services/investigation_service_supabase.py ADDED
@@ -0,0 +1,451 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Investigation service with Supabase backend integration.
3
+
4
+ This version replaces the in-memory storage with Supabase PostgreSQL,
5
+ providing persistent storage shared with the frontend.
6
+ """
7
+
8
+ from typing import List, Optional, Dict, Any
9
+ from datetime import datetime
10
+ import uuid
11
+
12
+ from src.core import get_logger
13
+ from src.services.supabase_service import get_supabase_service
14
+ from src.agents import MasterAgent, get_agent_pool
15
+ from src.agents.deodoro import AgentContext
16
+
17
+ logger = get_logger(__name__)
18
+
19
+
20
+ class InvestigationServiceSupabase:
21
+ """
22
+ Service for managing investigations with Supabase persistence.
23
+
24
+ This service provides a bridge between the agent system and Supabase,
25
+ allowing the frontend to access investigation results in real-time.
26
+ """
27
+
28
+ def __init__(self):
29
+ """Initialize investigation service."""
30
+ self._supabase = None
31
+
32
+ async def _get_supabase(self):
33
+ """Lazy load Supabase service."""
34
+ if self._supabase is None:
35
+ self._supabase = await get_supabase_service()
36
+ return self._supabase
37
+
38
+ async def create(
39
+ self,
40
+ user_id: str,
41
+ query: str,
42
+ data_source: str = "contracts",
43
+ filters: Optional[Dict[str, Any]] = None,
44
+ anomaly_types: Optional[List[str]] = None,
45
+ session_id: Optional[str] = None,
46
+ ) -> Dict[str, Any]:
47
+ """
48
+ Create a new investigation in Supabase.
49
+
50
+ Args:
51
+ user_id: User ID
52
+ query: Investigation query
53
+ data_source: Data source to investigate
54
+ filters: Query filters
55
+ anomaly_types: Types of anomalies to detect
56
+ session_id: Optional session ID
57
+
58
+ Returns:
59
+ Created investigation dict
60
+ """
61
+ supabase = await self._get_supabase()
62
+
63
+ investigation = await supabase.create_investigation(
64
+ user_id=user_id,
65
+ query=query,
66
+ data_source=data_source,
67
+ filters=filters or {},
68
+ anomaly_types=anomaly_types or [],
69
+ session_id=session_id,
70
+ )
71
+
72
+ logger.info(
73
+ "investigation_created",
74
+ investigation_id=investigation["id"],
75
+ user_id=user_id,
76
+ data_source=data_source,
77
+ )
78
+
79
+ return investigation
80
+
81
+ async def start_investigation(
82
+ self,
83
+ investigation_id: str,
84
+ ) -> None:
85
+ """
86
+ Start processing an investigation in the background.
87
+
88
+ Args:
89
+ investigation_id: Investigation UUID
90
+ """
91
+ supabase = await self._get_supabase()
92
+
93
+ # Get investigation details
94
+ investigation = await supabase.get_investigation(investigation_id)
95
+
96
+ if not investigation:
97
+ raise ValueError(f"Investigation {investigation_id} not found")
98
+
99
+ # Update to processing status
100
+ await supabase.update_investigation(
101
+ investigation_id,
102
+ status="processing",
103
+ started_at=datetime.utcnow(),
104
+ progress=0.1,
105
+ current_phase="initializing",
106
+ )
107
+
108
+ try:
109
+ # Execute investigation with agents
110
+ await self._execute_investigation(investigation)
111
+
112
+ except Exception as e:
113
+ logger.error(
114
+ "investigation_execution_failed",
115
+ investigation_id=investigation_id,
116
+ error=str(e),
117
+ exc_info=True,
118
+ )
119
+
120
+ # Mark as failed in Supabase
121
+ await supabase.fail_investigation(
122
+ investigation_id,
123
+ error_message=str(e),
124
+ )
125
+ raise
126
+
127
+ async def _execute_investigation(self, investigation: Dict[str, Any]):
128
+ """
129
+ Execute investigation using the agent system.
130
+
131
+ Args:
132
+ investigation: Investigation dict from database
133
+ """
134
+ investigation_id = investigation["id"]
135
+ supabase = await self._get_supabase()
136
+
137
+ # Update progress: data retrieval
138
+ await supabase.update_progress(
139
+ investigation_id,
140
+ progress=0.2,
141
+ current_phase="data_retrieval",
142
+ )
143
+
144
+ # Get agent pool
145
+ pool = await get_agent_pool()
146
+
147
+ # Create agent context
148
+ context = AgentContext(
149
+ investigation_id=investigation_id,
150
+ user_id=investigation["user_id"],
151
+ session_id=investigation.get("session_id"),
152
+ metadata={
153
+ "data_source": investigation["data_source"],
154
+ "filters": investigation.get("filters", {}),
155
+ "anomaly_types": investigation.get("anomaly_types", []),
156
+ }
157
+ )
158
+
159
+ # Update progress: anomaly detection
160
+ await supabase.update_progress(
161
+ investigation_id,
162
+ progress=0.4,
163
+ current_phase="anomaly_detection",
164
+ )
165
+
166
+ # Execute with master agent (or directly with investigator)
167
+ # For now, we'll simulate the investigation
168
+ # TODO: Replace with actual agent execution
169
+
170
+ from src.agents import InvestigatorAgent
171
+ investigator = InvestigatorAgent()
172
+
173
+ # Prepare investigation parameters
174
+ from src.tools import TransparencyAPIFilter
175
+ filters = TransparencyAPIFilter(**investigation.get("filters", {}))
176
+
177
+ # Execute investigation
178
+ results = await investigator.investigate_anomalies(
179
+ query=investigation["query"],
180
+ data_source=investigation["data_source"],
181
+ filters=filters,
182
+ anomaly_types=investigation.get("anomaly_types", []),
183
+ context=context,
184
+ )
185
+
186
+ # Update progress: analysis
187
+ await supabase.update_progress(
188
+ investigation_id,
189
+ progress=0.7,
190
+ current_phase="analysis",
191
+ records_processed=sum(len(r.affected_data) for r in results),
192
+ anomalies_found=len(results),
193
+ )
194
+
195
+ # Generate summary
196
+ summary = await investigator.generate_summary(results, context)
197
+
198
+ # Calculate confidence
199
+ confidence_score = (
200
+ sum(r.confidence for r in results) / len(results)
201
+ if results else 0.0
202
+ )
203
+
204
+ # Format results for storage
205
+ formatted_results = [
206
+ {
207
+ "anomaly_id": str(uuid.uuid4()),
208
+ "type": result.anomaly_type,
209
+ "severity": result.severity,
210
+ "confidence": result.confidence,
211
+ "description": result.description,
212
+ "explanation": result.explanation,
213
+ "affected_records": result.affected_data,
214
+ "suggested_actions": result.recommendations,
215
+ "metadata": result.metadata,
216
+ }
217
+ for result in results
218
+ ]
219
+
220
+ # Complete investigation in Supabase
221
+ await supabase.complete_investigation(
222
+ investigation_id=investigation_id,
223
+ results=formatted_results,
224
+ summary=summary,
225
+ confidence_score=confidence_score,
226
+ total_records=sum(len(r.affected_data) for r in results),
227
+ anomalies_found=len(results),
228
+ )
229
+
230
+ logger.info(
231
+ "investigation_completed",
232
+ investigation_id=investigation_id,
233
+ anomalies_found=len(results),
234
+ confidence_score=confidence_score,
235
+ )
236
+
237
+ async def update_progress(
238
+ self,
239
+ investigation_id: str,
240
+ progress: float,
241
+ current_phase: str,
242
+ records_processed: Optional[int] = None,
243
+ anomalies_found: Optional[int] = None,
244
+ ) -> Dict[str, Any]:
245
+ """
246
+ Update investigation progress.
247
+
248
+ Args:
249
+ investigation_id: Investigation UUID
250
+ progress: Progress percentage (0.0 to 1.0)
251
+ current_phase: Current processing phase
252
+ records_processed: Number of records processed
253
+ anomalies_found: Number of anomalies detected
254
+
255
+ Returns:
256
+ Updated investigation dict
257
+ """
258
+ supabase = await self._get_supabase()
259
+ return await supabase.update_progress(
260
+ investigation_id=investigation_id,
261
+ progress=progress,
262
+ current_phase=current_phase,
263
+ records_processed=records_processed,
264
+ anomalies_found=anomalies_found,
265
+ )
266
+
267
+ async def complete_investigation(
268
+ self,
269
+ investigation_id: str,
270
+ results: List[Dict[str, Any]],
271
+ summary: str,
272
+ confidence_score: float,
273
+ total_records: int = 0,
274
+ anomalies_found: int = 0,
275
+ ) -> Dict[str, Any]:
276
+ """
277
+ Mark investigation as completed with results.
278
+
279
+ Args:
280
+ investigation_id: Investigation UUID
281
+ results: List of anomaly results
282
+ summary: Investigation summary
283
+ confidence_score: Overall confidence
284
+ total_records: Total records analyzed
285
+ anomalies_found: Total anomalies found
286
+
287
+ Returns:
288
+ Updated investigation dict
289
+ """
290
+ supabase = await self._get_supabase()
291
+ return await supabase.complete_investigation(
292
+ investigation_id=investigation_id,
293
+ results=results,
294
+ summary=summary,
295
+ confidence_score=confidence_score,
296
+ total_records=total_records,
297
+ anomalies_found=anomalies_found,
298
+ )
299
+
300
+ async def get(self, investigation_id: str) -> Optional[Dict[str, Any]]:
301
+ """
302
+ Get investigation by ID (alias for get_by_id).
303
+
304
+ Args:
305
+ investigation_id: Investigation UUID
306
+
307
+ Returns:
308
+ Investigation dict or None
309
+ """
310
+ return await self.get_by_id(investigation_id)
311
+
312
+ async def get_by_id(self, investigation_id: str) -> Optional[Dict[str, Any]]:
313
+ """
314
+ Get investigation by ID.
315
+
316
+ Args:
317
+ investigation_id: Investigation UUID
318
+
319
+ Returns:
320
+ Investigation dict or None
321
+ """
322
+ supabase = await self._get_supabase()
323
+ return await supabase.get_investigation(investigation_id)
324
+
325
+ async def update_status(
326
+ self,
327
+ investigation_id: str,
328
+ status: str,
329
+ progress: Optional[float] = None,
330
+ current_phase: Optional[str] = None,
331
+ **kwargs
332
+ ) -> Dict[str, Any]:
333
+ """
334
+ Update investigation status and progress.
335
+
336
+ Args:
337
+ investigation_id: Investigation UUID
338
+ status: New status
339
+ progress: Progress percentage
340
+ current_phase: Current phase
341
+ **kwargs: Additional fields to update
342
+
343
+ Returns:
344
+ Updated investigation dict
345
+ """
346
+ supabase = await self._get_supabase()
347
+
348
+ updates = {"status": status}
349
+
350
+ if progress is not None:
351
+ updates["progress"] = progress
352
+
353
+ if current_phase is not None:
354
+ updates["current_phase"] = current_phase
355
+
356
+ updates.update(kwargs)
357
+
358
+ return await supabase.update_investigation(investigation_id, **updates)
359
+
360
+ async def search(
361
+ self,
362
+ user_id: Optional[str] = None,
363
+ status: Optional[str] = None,
364
+ limit: int = 20,
365
+ offset: int = 0,
366
+ ) -> List[Dict[str, Any]]:
367
+ """
368
+ Search investigations with filters.
369
+
370
+ Args:
371
+ user_id: Filter by user ID
372
+ status: Filter by status
373
+ limit: Maximum results
374
+ offset: Pagination offset
375
+
376
+ Returns:
377
+ List of investigation dicts
378
+ """
379
+ if not user_id:
380
+ raise ValueError("user_id is required for investigation search")
381
+
382
+ supabase = await self._get_supabase()
383
+ return await supabase.list_user_investigations(
384
+ user_id=user_id,
385
+ limit=limit,
386
+ offset=offset,
387
+ status=status,
388
+ )
389
+
390
+ async def cancel(self, investigation_id: str, user_id: str) -> Dict[str, Any]:
391
+ """
392
+ Cancel a running investigation.
393
+
394
+ Args:
395
+ investigation_id: Investigation UUID
396
+ user_id: User ID (for authorization)
397
+
398
+ Returns:
399
+ Updated investigation dict
400
+ """
401
+ supabase = await self._get_supabase()
402
+
403
+ # Get investigation to check ownership
404
+ investigation = await supabase.get_investigation(investigation_id)
405
+
406
+ if not investigation:
407
+ raise ValueError(f"Investigation {investigation_id} not found")
408
+
409
+ if investigation["user_id"] != user_id:
410
+ raise ValueError("Unauthorized: investigation belongs to another user")
411
+
412
+ if investigation["status"] in ["completed", "failed", "cancelled"]:
413
+ raise ValueError(
414
+ f"Cannot cancel investigation in {investigation['status']} status"
415
+ )
416
+
417
+ # Mark as cancelled
418
+ deleted = await supabase.delete_investigation(investigation_id, user_id)
419
+
420
+ if not deleted:
421
+ raise ValueError(f"Failed to cancel investigation {investigation_id}")
422
+
423
+ logger.info(
424
+ "investigation_cancelled",
425
+ investigation_id=investigation_id,
426
+ user_id=user_id,
427
+ )
428
+
429
+ # Return updated investigation
430
+ return await supabase.get_investigation(investigation_id)
431
+
432
+ async def get_user_investigations(
433
+ self,
434
+ user_id: str,
435
+ limit: int = 10
436
+ ) -> List[Dict[str, Any]]:
437
+ """
438
+ Get investigations for a user.
439
+
440
+ Args:
441
+ user_id: User ID
442
+ limit: Maximum results
443
+
444
+ Returns:
445
+ List of investigation dicts
446
+ """
447
+ return await self.search(user_id=user_id, limit=limit)
448
+
449
+
450
+ # Global service instance
451
+ investigation_service_supabase = InvestigationServiceSupabase()