Spaces:
Sleeping
Sleeping
| """ | |
| Job management for tracking async tasks | |
| """ | |
| import time | |
| import threading | |
| import logging | |
| from typing import Dict, Optional, Any | |
| logger = logging.getLogger(__name__) | |
| class JobManager: | |
| """Manages background job tracking and progress""" | |
| def __init__(self): | |
| self.active_jobs: Dict[str, Dict] = {} | |
| self.job_progress: Dict[str, Dict] = {} | |
| self.job_results: Dict[str, Dict] = {} | |
| self.jobs_lock = threading.Lock() | |
| self.progress_lock = threading.Lock() | |
| self.results_lock = threading.Lock() | |
| # Start cleanup task | |
| self._start_cleanup_task() | |
| def register_job(self, job_id: str): | |
| """Register a new job""" | |
| with self.jobs_lock: | |
| self.active_jobs[job_id] = { | |
| 'cancelled': False, | |
| 'created_at': time.time(), | |
| 'status': 'active' | |
| } | |
| with self.progress_lock: | |
| self.job_progress[job_id] = { | |
| 'stage': 'starting', | |
| 'progress': 0, | |
| 'message': 'Job started...', | |
| 'timestamp': time.time() | |
| } | |
| logger.info(f"π Job registered: {job_id}") | |
| def is_job_cancelled(self, job_id: str) -> bool: | |
| """Check if a job has been cancelled""" | |
| with self.jobs_lock: | |
| return self.active_jobs.get(job_id, {}).get('cancelled', False) | |
| def cancel_job(self, job_id: str) -> bool: | |
| """Cancel a job""" | |
| with self.jobs_lock: | |
| if job_id in self.active_jobs: | |
| self.active_jobs[job_id]['cancelled'] = True | |
| self.active_jobs[job_id]['status'] = 'cancelled' | |
| logger.info(f"β Job cancelled: {job_id}") | |
| return True | |
| return False | |
| def update_job_progress(self, job_id: str, stage: str, progress: int, message: str): | |
| """Update job progress""" | |
| with self.progress_lock: | |
| if job_id in self.job_progress: | |
| self.job_progress[job_id] = { | |
| 'stage': stage, | |
| 'progress': progress, | |
| 'message': message, | |
| 'timestamp': time.time() | |
| } | |
| logger.info(f"π Job {job_id}: {stage} - {progress}% - {message}") | |
| def get_job_progress(self, job_id: str) -> Optional[Dict]: | |
| """Get current job progress""" | |
| with self.progress_lock: | |
| return self.job_progress.get(job_id) | |
| def complete_job(self, job_id: str, results: Dict[str, Any]): | |
| """Mark job as completed with results""" | |
| with self.jobs_lock: | |
| if job_id in self.active_jobs: | |
| self.active_jobs[job_id]['status'] = 'completed' | |
| with self.progress_lock: | |
| self.job_progress[job_id] = { | |
| 'stage': 'completed', | |
| 'progress': 100, | |
| 'message': 'Job completed successfully!', | |
| 'timestamp': time.time() | |
| } | |
| with self.results_lock: | |
| self.job_results[job_id] = { | |
| **results, | |
| 'completed_at': time.time() | |
| } | |
| logger.info(f"β Job completed: {job_id}") | |
| def fail_job(self, job_id: str, error_message: str): | |
| """Mark job as failed""" | |
| with self.jobs_lock: | |
| if job_id in self.active_jobs: | |
| self.active_jobs[job_id]['status'] = 'failed' | |
| with self.progress_lock: | |
| self.job_progress[job_id] = { | |
| 'stage': 'error', | |
| 'progress': 0, | |
| 'message': f'Error: {error_message}', | |
| 'timestamp': time.time() | |
| } | |
| logger.error(f"β Job failed: {job_id} - {error_message}") | |
| def get_job_results(self, job_id: str) -> Optional[Dict]: | |
| """Get job results if completed""" | |
| with self.results_lock: | |
| return self.job_results.get(job_id) | |
| def get_active_job_count(self) -> int: | |
| """Get number of active jobs""" | |
| with self.jobs_lock: | |
| return len([j for j in self.active_jobs.values() if j['status'] == 'active']) | |
| def cleanup_old_jobs(self): | |
| """Clean up jobs older than 30 minutes""" | |
| current_time = time.time() | |
| cleanup_age = 1800 # 30 minutes | |
| jobs_to_remove = [] | |
| with self.jobs_lock: | |
| for job_id, job_data in self.active_jobs.items(): | |
| if current_time - job_data['created_at'] > cleanup_age: | |
| jobs_to_remove.append(job_id) | |
| for job_id in jobs_to_remove: | |
| self._remove_job(job_id) | |
| logger.info(f"π§Ή Cleaned up old job: {job_id}") | |
| def _remove_job(self, job_id: str): | |
| """Remove job from all tracking dictionaries""" | |
| with self.jobs_lock: | |
| self.active_jobs.pop(job_id, None) | |
| with self.progress_lock: | |
| self.job_progress.pop(job_id, None) | |
| with self.results_lock: | |
| self.job_results.pop(job_id, None) | |
| def _start_cleanup_task(self): | |
| """Start background cleanup task""" | |
| def cleanup_worker(): | |
| while True: | |
| time.sleep(300) # Run every 5 minutes | |
| try: | |
| self.cleanup_old_jobs() | |
| except Exception as e: | |
| logger.error(f"β Error in cleanup task: {str(e)}") | |
| cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| cleanup_thread.start() | |
| logger.info("π§Ή Cleanup task started") |