Spaces:
Sleeping
Sleeping
File size: 5,689 Bytes
86e7db6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""
Job management for tracking async tasks
"""
import time
import threading
import logging
from typing import Dict, Optional, Any
logger = logging.getLogger(__name__)
class JobManager:
"""Manages background job tracking and progress"""
def __init__(self):
self.active_jobs: Dict[str, Dict] = {}
self.job_progress: Dict[str, Dict] = {}
self.job_results: Dict[str, Dict] = {}
self.jobs_lock = threading.Lock()
self.progress_lock = threading.Lock()
self.results_lock = threading.Lock()
# Start cleanup task
self._start_cleanup_task()
def register_job(self, job_id: str):
"""Register a new job"""
with self.jobs_lock:
self.active_jobs[job_id] = {
'cancelled': False,
'created_at': time.time(),
'status': 'active'
}
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'starting',
'progress': 0,
'message': 'Job started...',
'timestamp': time.time()
}
logger.info(f"π Job registered: {job_id}")
def is_job_cancelled(self, job_id: str) -> bool:
"""Check if a job has been cancelled"""
with self.jobs_lock:
return self.active_jobs.get(job_id, {}).get('cancelled', False)
def cancel_job(self, job_id: str) -> bool:
"""Cancel a job"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['cancelled'] = True
self.active_jobs[job_id]['status'] = 'cancelled'
logger.info(f"β Job cancelled: {job_id}")
return True
return False
def update_job_progress(self, job_id: str, stage: str, progress: int, message: str):
"""Update job progress"""
with self.progress_lock:
if job_id in self.job_progress:
self.job_progress[job_id] = {
'stage': stage,
'progress': progress,
'message': message,
'timestamp': time.time()
}
logger.info(f"π Job {job_id}: {stage} - {progress}% - {message}")
def get_job_progress(self, job_id: str) -> Optional[Dict]:
"""Get current job progress"""
with self.progress_lock:
return self.job_progress.get(job_id)
def complete_job(self, job_id: str, results: Dict[str, Any]):
"""Mark job as completed with results"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['status'] = 'completed'
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'completed',
'progress': 100,
'message': 'Job completed successfully!',
'timestamp': time.time()
}
with self.results_lock:
self.job_results[job_id] = {
**results,
'completed_at': time.time()
}
logger.info(f"β
Job completed: {job_id}")
def fail_job(self, job_id: str, error_message: str):
"""Mark job as failed"""
with self.jobs_lock:
if job_id in self.active_jobs:
self.active_jobs[job_id]['status'] = 'failed'
with self.progress_lock:
self.job_progress[job_id] = {
'stage': 'error',
'progress': 0,
'message': f'Error: {error_message}',
'timestamp': time.time()
}
logger.error(f"β Job failed: {job_id} - {error_message}")
def get_job_results(self, job_id: str) -> Optional[Dict]:
"""Get job results if completed"""
with self.results_lock:
return self.job_results.get(job_id)
def get_active_job_count(self) -> int:
"""Get number of active jobs"""
with self.jobs_lock:
return len([j for j in self.active_jobs.values() if j['status'] == 'active'])
def cleanup_old_jobs(self):
"""Clean up jobs older than 30 minutes"""
current_time = time.time()
cleanup_age = 1800 # 30 minutes
jobs_to_remove = []
with self.jobs_lock:
for job_id, job_data in self.active_jobs.items():
if current_time - job_data['created_at'] > cleanup_age:
jobs_to_remove.append(job_id)
for job_id in jobs_to_remove:
self._remove_job(job_id)
logger.info(f"π§Ή Cleaned up old job: {job_id}")
def _remove_job(self, job_id: str):
"""Remove job from all tracking dictionaries"""
with self.jobs_lock:
self.active_jobs.pop(job_id, None)
with self.progress_lock:
self.job_progress.pop(job_id, None)
with self.results_lock:
self.job_results.pop(job_id, None)
def _start_cleanup_task(self):
"""Start background cleanup task"""
def cleanup_worker():
while True:
time.sleep(300) # Run every 5 minutes
try:
self.cleanup_old_jobs()
except Exception as e:
logger.error(f"β Error in cleanup task: {str(e)}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
logger.info("π§Ή Cleanup task started") |