File size: 5,689 Bytes
86e7db6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Job management for tracking async tasks
"""

import time
import threading
import logging
from typing import Dict, Optional, Any

logger = logging.getLogger(__name__)

class JobManager:
    """Manages background job tracking and progress"""
    
    def __init__(self):
        self.active_jobs: Dict[str, Dict] = {}
        self.job_progress: Dict[str, Dict] = {}
        self.job_results: Dict[str, Dict] = {}
        self.jobs_lock = threading.Lock()
        self.progress_lock = threading.Lock()
        self.results_lock = threading.Lock()
        
        # Start cleanup task
        self._start_cleanup_task()
    
    def register_job(self, job_id: str):
        """Register a new job"""
        with self.jobs_lock:
            self.active_jobs[job_id] = {
                'cancelled': False,
                'created_at': time.time(),
                'status': 'active'
            }
        
        with self.progress_lock:
            self.job_progress[job_id] = {
                'stage': 'starting',
                'progress': 0,
                'message': 'Job started...',
                'timestamp': time.time()
            }
        
        logger.info(f"πŸ“ Job registered: {job_id}")
    
    def is_job_cancelled(self, job_id: str) -> bool:
        """Check if a job has been cancelled"""
        with self.jobs_lock:
            return self.active_jobs.get(job_id, {}).get('cancelled', False)
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel a job"""
        with self.jobs_lock:
            if job_id in self.active_jobs:
                self.active_jobs[job_id]['cancelled'] = True
                self.active_jobs[job_id]['status'] = 'cancelled'
                logger.info(f"❌ Job cancelled: {job_id}")
                return True
        return False
    
    def update_job_progress(self, job_id: str, stage: str, progress: int, message: str):
        """Update job progress"""
        with self.progress_lock:
            if job_id in self.job_progress:
                self.job_progress[job_id] = {
                    'stage': stage,
                    'progress': progress,
                    'message': message,
                    'timestamp': time.time()
                }
                logger.info(f"πŸ“Š Job {job_id}: {stage} - {progress}% - {message}")
    
    def get_job_progress(self, job_id: str) -> Optional[Dict]:
        """Get current job progress"""
        with self.progress_lock:
            return self.job_progress.get(job_id)
    
    def complete_job(self, job_id: str, results: Dict[str, Any]):
        """Mark job as completed with results"""
        with self.jobs_lock:
            if job_id in self.active_jobs:
                self.active_jobs[job_id]['status'] = 'completed'
        
        with self.progress_lock:
            self.job_progress[job_id] = {
                'stage': 'completed',
                'progress': 100,
                'message': 'Job completed successfully!',
                'timestamp': time.time()
            }
        
        with self.results_lock:
            self.job_results[job_id] = {
                **results,
                'completed_at': time.time()
            }
        
        logger.info(f"βœ… Job completed: {job_id}")
    
    def fail_job(self, job_id: str, error_message: str):
        """Mark job as failed"""
        with self.jobs_lock:
            if job_id in self.active_jobs:
                self.active_jobs[job_id]['status'] = 'failed'
        
        with self.progress_lock:
            self.job_progress[job_id] = {
                'stage': 'error',
                'progress': 0,
                'message': f'Error: {error_message}',
                'timestamp': time.time()
            }
        
        logger.error(f"❌ Job failed: {job_id} - {error_message}")
    
    def get_job_results(self, job_id: str) -> Optional[Dict]:
        """Get job results if completed"""
        with self.results_lock:
            return self.job_results.get(job_id)
    
    def get_active_job_count(self) -> int:
        """Get number of active jobs"""
        with self.jobs_lock:
            return len([j for j in self.active_jobs.values() if j['status'] == 'active'])
    
    def cleanup_old_jobs(self):
        """Clean up jobs older than 30 minutes"""
        current_time = time.time()
        cleanup_age = 1800  # 30 minutes
        
        jobs_to_remove = []
        
        with self.jobs_lock:
            for job_id, job_data in self.active_jobs.items():
                if current_time - job_data['created_at'] > cleanup_age:
                    jobs_to_remove.append(job_id)
        
        for job_id in jobs_to_remove:
            self._remove_job(job_id)
            logger.info(f"🧹 Cleaned up old job: {job_id}")
    
    def _remove_job(self, job_id: str):
        """Remove job from all tracking dictionaries"""
        with self.jobs_lock:
            self.active_jobs.pop(job_id, None)
        
        with self.progress_lock:
            self.job_progress.pop(job_id, None)
        
        with self.results_lock:
            self.job_results.pop(job_id, None)
    
    def _start_cleanup_task(self):
        """Start background cleanup task"""
        def cleanup_worker():
            while True:
                time.sleep(300)  # Run every 5 minutes
                try:
                    self.cleanup_old_jobs()
                except Exception as e:
                    logger.error(f"❌ Error in cleanup task: {str(e)}")
        
        cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
        cleanup_thread.start()
        logger.info("🧹 Cleanup task started")