""" Job management for tracking async tasks """ import time import threading import logging from typing import Dict, Optional, Any logger = logging.getLogger(__name__) class JobManager: """Manages background job tracking and progress""" def __init__(self): self.active_jobs: Dict[str, Dict] = {} self.job_progress: Dict[str, Dict] = {} self.job_results: Dict[str, Dict] = {} self.jobs_lock = threading.Lock() self.progress_lock = threading.Lock() self.results_lock = threading.Lock() # Start cleanup task self._start_cleanup_task() def register_job(self, job_id: str): """Register a new job""" with self.jobs_lock: self.active_jobs[job_id] = { 'cancelled': False, 'created_at': time.time(), 'status': 'active' } with self.progress_lock: self.job_progress[job_id] = { 'stage': 'starting', 'progress': 0, 'message': 'Job started...', 'timestamp': time.time() } logger.info(f"๐Ÿ“ Job registered: {job_id}") def is_job_cancelled(self, job_id: str) -> bool: """Check if a job has been cancelled""" with self.jobs_lock: return self.active_jobs.get(job_id, {}).get('cancelled', False) def cancel_job(self, job_id: str) -> bool: """Cancel a job""" with self.jobs_lock: if job_id in self.active_jobs: self.active_jobs[job_id]['cancelled'] = True self.active_jobs[job_id]['status'] = 'cancelled' logger.info(f"โŒ Job cancelled: {job_id}") return True return False def update_job_progress(self, job_id: str, stage: str, progress: int, message: str): """Update job progress""" with self.progress_lock: if job_id in self.job_progress: self.job_progress[job_id] = { 'stage': stage, 'progress': progress, 'message': message, 'timestamp': time.time() } logger.info(f"๐Ÿ“Š Job {job_id}: {stage} - {progress}% - {message}") def get_job_progress(self, job_id: str) -> Optional[Dict]: """Get current job progress""" with self.progress_lock: return self.job_progress.get(job_id) def complete_job(self, job_id: str, results: Dict[str, Any]): """Mark job as completed with results""" with self.jobs_lock: if job_id in self.active_jobs: self.active_jobs[job_id]['status'] = 'completed' with self.progress_lock: self.job_progress[job_id] = { 'stage': 'completed', 'progress': 100, 'message': 'Job completed successfully!', 'timestamp': time.time() } with self.results_lock: self.job_results[job_id] = { **results, 'completed_at': time.time() } logger.info(f"โœ… Job completed: {job_id}") def fail_job(self, job_id: str, error_message: str): """Mark job as failed""" with self.jobs_lock: if job_id in self.active_jobs: self.active_jobs[job_id]['status'] = 'failed' with self.progress_lock: self.job_progress[job_id] = { 'stage': 'error', 'progress': 0, 'message': f'Error: {error_message}', 'timestamp': time.time() } logger.error(f"โŒ Job failed: {job_id} - {error_message}") def get_job_results(self, job_id: str) -> Optional[Dict]: """Get job results if completed""" with self.results_lock: return self.job_results.get(job_id) def get_active_job_count(self) -> int: """Get number of active jobs""" with self.jobs_lock: return len([j for j in self.active_jobs.values() if j['status'] == 'active']) def cleanup_old_jobs(self): """Clean up jobs older than 30 minutes""" current_time = time.time() cleanup_age = 1800 # 30 minutes jobs_to_remove = [] with self.jobs_lock: for job_id, job_data in self.active_jobs.items(): if current_time - job_data['created_at'] > cleanup_age: jobs_to_remove.append(job_id) for job_id in jobs_to_remove: self._remove_job(job_id) logger.info(f"๐Ÿงน Cleaned up old job: {job_id}") def _remove_job(self, job_id: str): """Remove job from all tracking dictionaries""" with self.jobs_lock: self.active_jobs.pop(job_id, None) with self.progress_lock: self.job_progress.pop(job_id, None) with self.results_lock: self.job_results.pop(job_id, None) def _start_cleanup_task(self): """Start background cleanup task""" def cleanup_worker(): while True: time.sleep(300) # Run every 5 minutes try: self.cleanup_old_jobs() except Exception as e: logger.error(f"โŒ Error in cleanup task: {str(e)}") cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) cleanup_thread.start() logger.info("๐Ÿงน Cleanup task started")